jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之Gateway模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 3131 次 | 发布:2015-12-21 10:33 p.m.

周末看了两场NBA之后,又要来开始继续研究Elasticsearch的源代码,今天我们来一起看看Gateway模块,Gateway模块用于存储es集群的MetaData。MetaData每一次改变(比如增加删除索引等),都要通过Gateway模块进行持久化。当集群第一次启动的时候,这些信息就会从Gateway模块中读出并应用。

一般情况下,Gateway我们都设置为Local方式,即将数据存储在本地,那么本文也就主要针对这种方式来说。其实在Elasticsearch中生成的数据文件一般来说主要有三种,分别是state、index和translog,后两种想必大家也都知晓,那么state主要就是Gateway存储数据的文件。

Gateway的写入

那么我们首先来看看Gateway的写入,其实Gateway是继承自ClusterStateListener,所以说只要有集群状态的变化就会触发相应的动作,这个动作就是clusterChanged,而这个变化就是ClusterChangedEvent,首先会判断当前cluster是否block了,如果是的话会重置当前metadata,相当于回到初始状态。如果没有block则继续,只有在当前node是master或是data的情况下才会做写入Gateway的操作,判断是否metadata写入当前node,需要通过查看shard routing,且仅当这index的shard被分配在此节点上才会写入。

但是,已经关闭的index不会出现在shard routing中。如果已关闭索引的metadata被更新了,会从磁盘上load这个index的metadata,并将之添加到previouslyWrittenIndices中,

ImmutableSet.Builder<String> previouslyWrittenIndicesBuilder = ImmutableSet.builder();
for (IndexMetaData indexMetaData : newMetaData) {
    IndexMetaData indexMetaDataOnDisk = null;
    if (indexMetaData.getState().equals(IndexMetaData.State.CLOSE)) {
        indexMetaDataOnDisk = metaStateService.loadIndexState(indexMetaData.getIndex());
    }
    if (indexMetaDataOnDisk != null) {
        previouslyWrittenIndicesBuilder.add(indexMetaDataOnDisk.getIndex());
    }
}
previouslyWrittenIndices = previouslyWrittenIndicesBuilder.addAll(previouslyWrittenIndices).build();

上面的loadIndexState就是从Gateway的文件中获取最新的state,然后需要将当前这个变化涉及到的index拿到,再才能生成最后要写入磁盘的数据,也即下面的writeInfo:

relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices);
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());         

下面才是真正写入磁盘的逻辑,写入的文件的位置为STATE_DIR_NAME = "_state",文件后缀是STATE_FILE_EXTENSION = ".st",如果文件不存在的话会自动创建,state文件会先序列化为一临时文件,然后自动转移为格式为{prefix}{version}.st的目标文件。要遍历这些需要写入的index,分别写入到指定的位置,这里的location其实是nodeEnv.indexPaths(new Index(indexMetaData.getIndex()))来生成的。

for (int i = 1; i < locations.length; i++) {
    stateLocation = locations[i].resolve(STATE_DIR_NAME);
    Files.createDirectories(stateLocation);
    Path tmpPath = stateLocation.resolve(fileName + ".tmp");
    Path finalPath = stateLocation.resolve(fileName);
    try {
        Files.copy(finalStatePath, tmpPath);
        Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE); // we are on the same FileSystem / Partition here we can do an atomic move
        IOUtils.fsync(stateLocation, true); // we just fsync the dir here..
    } finally {
        Files.deleteIfExists(tmpPath);
    }
}

写入完成之后,还需要做一件事,就是根据metadata处理dangling的index(就是指存在于磁盘上,但是在集群的metadata里面没有的index),将他们重新引入集群,这里需要从三步来走:

1、如果提供的metadata已经存在了,就清除掉dangling的index;
2、找到新的dangling的index,并加入;
3、将现在dangling中的index发送给master节点,用作allocation:

cleanupAllocatedDangledIndices(metaData);
findNewAndAddDanglingIndices(metaData);
allocateDanglingIndices();

state的Rrecover

其实这里就是来说Gateway在state做恢复的时候的表现了,当clusterChanged的时候也会触发checkStateMeetsSettingsAndMaybeRecover,那么其实这个方法是判断是否要Recovery,判断的条件由这么一堆的配置文件来决定:

  • gateway.recover_after_nodes:集群在达到多少个节点的规模后,才开始数据恢复任务,默认值是-1;
  • gateway.recover_after_data_nodes:集群在达到多少个数据节点的规模后,才开始数据恢复任务,默认值是-1;
  • gateway.recover_after_master_nodes:集群在达到多少个master节点的规模后,才开始数据恢复任务,默认值是discovery.zen.minimum_master_nodes,如果这个用户没有配置的话,那么也是-1;

当集群没有global block,并且满足上面的三个条件(注意这里是“并”的关系)时,还得先来通过相关配置来决定是否马上recovery:

  • gateway.recover_after_time:集群多久之后才开始恢复,当下面三个有一个有设置时,此项默认是5min;
  • gateway.expected_nodes:集群节点数达到多少时,立即recovery,不用等待上面的时间配置;
  • gateway.expected_data_nodes:略
  • gateway.expected_master_nodes:略

如果不是立即recovery,会启动一个GENERIC类型(上一篇文章有说啊)的threadPool.schedule,等时间到了再recovery,真正recovery的实现在performStateRecovery里面完成。在该方法中会获得各个节点汇集过来的state信息,然后找到各个节点中version最高的index state信息作为electedGlobalState,如果version都一样,就用第一个。与此同时,也会获取系统中有多少index,每个index的具体配置是什么情况(包括分配多少shard、每个shard多少副本等)然后计算路由;通过配置gateway.initial_meta(没有设置默认就是gateway.local.initial_meta,仍然没有设置就是discovery.zen.minimum_master_nodes,还是没有设置那么默认就为“1”),来获取requiredAllocation的数目,具体的逻辑为:

requiredAllocation的数目必须不大于集群现在已发现的节点数,接着遍历所有的index,并从之前收集到的state信息中中获取到该index的metadata,同样的要选一个作为electedIndexMetaData,选version最大的,如果都一样就选第一个,并以此作为该index的metadata,然后build出ClusterState。注意在恢复meta过程中,所有操作都将堵塞,为了避免和集群真实的meta产生冲突。

Gateway的allocate

这一部分我并不打算在这里说,我把它放到下一篇中详细去介绍整个allocate过程的时候再说,先跳过。

总结

上面也是简要介绍了local gateway的工作方式,对于上面对gateway的配置,为了保证集群自动发现的初期,meta信息完整,建议适当设置gateway.recover_after_nodes的值。那么在下一篇中会重点看看数据的allocation,又是一块硬骨头啊。


转载请注明出处:http://www.opscoder.info/es_gateway.html

其他分类: