jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之索引过程(一)

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 4913 次 | 发布:2015-11-28 10:41 p.m.

接着上一篇说了发送端的情况,现在我们来聊聊ES数据接收端都干了些啥,ES是怎么把数据插进去的。

既然是接着上一篇所说的,那么我们还是以transport的bulk为入口来探究,对于单个document的发送就忽略了。

索引创建

上一篇其实已经说到了,一个bulk请求其实就是add了很多request的实例,组成了一个BulkRequest,在服务端接收到其实是一个bulkRequest的实例,拿到这个实例之后,会根据配置action.auto_create_index来决定是否可以自动创建索引(默认是可以的),若是可以创建,会遍历所有的request取得其中包含的index和type,然后在遍历这些index和type,如果集群中不存在相应的index和type,则创建(创建过程这里先不说)之,完成之后才开始真正的bulk执行过程。

接收node对bulk的处理

1、首先会判断集群的是否block了读操作(在一些特定情况下集群会block一些操作,这里先不说),如果是blocked,就会立即返回错误,这里代码里添加了一个TODOTODO use timeout to wait here if its blocked...,看来作者后面是打算用timeout来控制,而不是直接报错啊。

2、遍历所有的request,对其做一些加工,主要包括:获取routing(如果mapping里有的话)、指定的timestamp(如果没有带timestamp会使用当前时间),如果没有指定id字段,在action.bulk.action.allow_id_generation配置为true的情况下,会自动生成一个base64UUID作为id字段,并会将request的opType字段置为CREATE,因为如果是使用es自动生成的id的话,默认就是createdocument而不是updatedocument,这个我们后面再说。(注:坑爹啊,我从github上面下的最新的ES代码,发现自动生成id这一段已经没有设置opType字段了,看起来和有指定id是一样的处理逻辑了,这不和文档说得不一样么,checkout到release版本才有的,看来后面是要对自带id做优化了,因为这对写入性能的影响还挺大的,我会持续关注)。

3、创建一个shardId--> Operation的Map>,再次遍历所有的request,获取获取每个request应该发送到的shardId,获取的过程是这样的:request有routing(上面2中获取到的)就直接返回,如果没有,会先对id求一个hash,这里的hash函数默认是Murmur3,当然你也可以通过配置index.legacy.routing.hash.type来决定使用的hash函数,下面这个才最终决定发到哪个shard:

return MathUtils.mod(hash, indexMetaData.getNumberOfShards());

即用hash对shard的总数求模来获取shardId,将shardId作为key,通过遍历的index和request组成BulkItemRequest的集合作为value放入之前说的map中(为什么要拿到遍历的index,因为在bulk response中可以看到对每个request的请求处理结果的,所以这个顺利很重要的),其实说了这么多就是要对request按shard来分组。

4、遍历上面得到的map,对不同的分组创建一个bulkShardRequest,包含配置consistencyLevel和timeout。并从集群state中获得primary shard,如果primary shard不是active的会有retry机制。如果primary在本机就直接执行,如果不在会再发送到其shard所在的node。

5、checkWriteConsistency,根据配置的consistencyLevel来核对active的shards数是否符合,如果重试后仍然不符合就会报错返回。

写入primary

接上面所说,在primary上面看看数据是怎么写入的:

1、在primary的节点上,遍历上文生成的BulkItemRequest中的request,分别对每一个request做处理(这里包括生成uid,对nested、parent-child类型做处理等等)。首先要确定这个request的operation是什么,根据上文中的opType字段来分为INDEXCREATE两种operation。

2、接下来是是要解析request中的内容,来看看是否需要更新mapping,如果需要更新,则会同步地首先在master上面更新,然后再由master传播到集群的其他节点。(注意这里为什么一定要同步?为了确保在数据插入之前mapping与数据匹配。为什么一定要先到master?因为只有master有权利更改mapping,虽然这样会影响效率,但是可以避免mapping冲突。)这里虽然是同步但是并不返回结果,而是完成后会再次check是否需要更新mapping,如果结果仍然说需要更新,说明之前的更新并没有成功会报错“Dynamics mappings are not available on the node that holds the primary yet”;

3、首先来看看直接CREATE的:

3.1、先确保当前这个写操作是允许的,只有在所写shard的状态是STARTEDRELOCATED才会被允许。再记录下最后的写入时间。

3.2、接下来在写入之前会首先执行一些listener,我瞅了一眼现在ES只提供了Percolator这一种,Percolator就是在数据写入之前执行一些定义好的action,具体的内容读者自行阅读官方文档。

3.3、从engineFactory中获取一个newReadWriteEngine实例,里面包含一个lucene中的IndexWriter。

3.4、因为写入时并发的,所以这里对于每一笔写入都加了锁,synchronized (dirtyLock(create.uid()))用uid来判别,防止写入脏数据。然后会获取当前的version,关于version,用户是可以发送的时候自己指定的,也可以自动生成(默认从1开始)。

3.5、开始写入索引indexWrite:indexWriter.updateDocuments(create.uid(), create.docs());具体细节由lucence去做了。

3.6、写入translog,translog中的内容以后再说。

4、再来看看INDEX的:

其实和上面的CREATE相比,只是在indexWrite之前增加了一项version lookup;这项操作其实是将uid作为一个term去索引里面查找version的值,同时会优化频繁更新的document,其实就是把这些document放在最后面的segments中。根据是否查到version来决定是add还是update: 这里的update其实是将原来的document置为delete状态,并将新的document插入,merge的时候会将delete状态的document删除掉。

5、完成数据索引之后,如果请求里面要求refresh,则会立即refresh,如果要求立即将translog写入storage也会立即执行。

至此,primary shard上面的数据完成。

写入replica

现在开始写入replica,这里会根据shard的副本数来决定写不写以及写多少。如果replica shard是unassigned状态就会直接跳过.先要获取到replica应该要写入的node,同样的,如果是本地就直接写,如果是其他node就发送过去。注意replica的写入不是一定要完成的,比如你发现要写入的node不在了,那么直接返回即可。因为ES还有relocating等机制来完成replica。

其实后面的逻辑就和上面primary的是一样的了,就不一一重复了。

到这里,bulk所以工作完成,返回给client,大功告成!!!

单条数据索引

上面所说的都是bulk的数据索引,单条document的索引其实也是类似的,过程也是差不多的,就不啰嗦了。

结语

可以看到其实es的数据的写入是需要做大量的工作的,所以为了增加单位时间的写入量,bulk size要设置一个合理的值;尽量不要做自定义id;如果可以不用replica,可以设为0…………

其实这此为止,只是将数据写入了buffer和translog里面,后面还有很长的路要走,我们下篇再说。


转载请注明出处:http://www.opscoder.info/es_indexprocess1.html

其他分类: