jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之Discover模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 2459 次 | 发布:2015-11-20 11:13 p.m.

下面我们来谈谈elasticsearch中的Discover模块,这个模块主要是用作处理elasticsearch中的集群问题,是elasticsearch中比较复杂的一个模块了吧。该模块在elasticsearch中分为两个,分别是LocalDiscover和ZenDiscover,在这里我们只说后者,因为在我们的使用中一般也是用的ZenDiscover,这也是elasticsearch默认的。

粗略介绍一下ZenDiscover

ZenDiscover其实是有一些其他的模块配合而成的,比如node之间的通信就是用的transport模块(这个本篇文章先不介绍)。ZenDiscover可以分为以下几个子模块:

  • Ping:由于node之间互相发现的;
  • Unicast:通过配置discovery.zen.ping.unicast对列表中的node单播;
  • Master Election:用作Master的选举;
  • Fault Delection:用ping的方式来确定node是否在集群里面;
  • Cluster state updates:这个指在master node上有效,用于向其他node发布集群的状态信息;
  • No master block:通过配置discovery.zen.no_master_block来决定当没有active的master时,是否拒绝某些操作;

ZenDiscover的初始化

初始化的时候会加载我上段提到的几个模块,我就不再重复了,值得注意的是Fault Delection的分为两个masterFD和nodesFD;其次还加载了一些对于discover的配置,简单地列出几个:

  • pingTimeout:ping时候的timeout,默认为3s;
  • joinTimeout:当一个新的node加入集群时,将会发个join的request到master,这个request的timeout即joinTimeout,默认为pingTimeout的20倍;
  • joinRetryAttempts:join重试的次数,默认为2次;
  • joinRetryDelay:重试的间隔,默认为100ms;
  • maxPingsFromAnotherMaster:容忍其他master发出的,在强制其他或是本地master rejoin之前的次数;
  • masterElectionWaitForJoinsTimeout: master选举时等待join的timeout,默认是joinTimeout的一半;

其中joinRetryAttempts和maxPingsFromAnotherMaster是一定要大于等于1的。

ZenDiscover的运行

其实ZenDiscover的运行就是几个子模块的运行;下面我们来一一说说它们:

joinThreadControl

首先要将自己加入集群,加入集群的thread是如何工作的呢,我们来详细看一看:

一、首先是findMaster,findMaster通过pingService来实现,在这里我们只探讨unicast的ping的方式,主要的实现在sendPings这个方法里面,让我们来详细看看这个方法是怎么ping的。

1、首先确定ping发送的目标node:

可以看到由三个部分组成:第一部分是其实是由clusterService.state().nodes();而得来,也就是拥有同一cluster.name的这些node;第二部分是hostsProviders自动生成的hosts列表;第三部分是所以可能作为master的node。

2、对要发送的node进行排序,这个排序的依据是根据可能成为master的概率来的。(为什么要排序?)利用快排将master node排在前面,data node排在后面;在加入配置中提供的hosts的列表,即discovery.zen.ping.unicast;

3、分别对nodes列表里面的node发送ping,如果transport连接在,将会发送一个ping request到目标node,发送的逻辑最终其实落在了TransportService中,再具体来说,是在NettyTansport中,具体的逻辑这里先不讲,放在后面的文章中专门讲Transport。如果transport连接不在了,会先connectToNode建立连接,然后再发送ping request。注意为了保证不会让一个node断掉连接,在ping的时候会重新加入DiscoveryNodes。

4、对ping的结果进行筛选,如果masterElectionFilterClientNodes设为了true的话,则这里会排除只是client的node,或是只是data的node,在剩下的node中选出可以作为master的node,注意这里要排除localhost,是为了避免每个都将自己作为master而产生脑裂;

5、这里会创建两个node列表,分别是activeNodes和joinedOnceActiveNodes,前者表示所有active的node,后者表示集群中之前就存在的node。

6、 如果从之前的ping中没有发现master,表示要重新选举了;首先判断active的master node是否大于配置中的minimumMasterNodes,不然是选举不了master的。ok,若满足minMaster,会优先从joinedOnceActiveNodes中选举出master;若选不出才会从activeNodes中去选举。选举的逻辑其实超级简单的, 先排序,在排序时会remove掉不能作为master的node,排序完后将第一个作为master:

7、如果ping中发现了master,则返回这个node。

二、在选举出master之后,node会根据自己不同的角色不同干不同的事情;

如果当前node是master,则会启动nodesFD,nodesFD启动一个schedulethreadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);去ping其他的node,相当于heartbeat,如果检测失败会被踢出集群nodesFD.remove(node, NodeFD.this)

如果当前node不是master,会向master发出一个join的request,注意这里并不会开启一个master fault detection;因为这一部分在processNextPendingClusterState中实现。

publishClusterState

顾名思义就是将集群的state发送到其他节点上,这个只有master才有的权利,那么需要发布的state有哪些呢?如下图:

注意在elasticsearch2.0之后默认发送state采用diff的方式,当然你也可以通过配置来设置discovery.zen.publish_diff.enable是否使用diff。集群的state的描述都在ClusterState中,我们可以来看看这个ClusterState都包含哪些东西呢?

    private final long version;

    private final String stateUUID;

    private final RoutingTable routingTable;

    private final DiscoveryNodes nodes;

    private final MetaData metaData;

    private final ClusterBlocks blocks;

    private final ImmutableOpenMap<String, Custom> customs;

    private final ClusterName clusterName;

包含了routing、nodes、MetaData等信息;其中最为重要的就是MetaData了,这个MetaData包含:

    private final Settings transientSettings;
    private final Settings persistentSettings;
    private final Settings settings;
    private final ImmutableOpenMap<String, IndexMetaData> indices;
    private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;
    private final ImmutableOpenMap<String, Custom> customs;

    private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
    private final int numberOfShards;

    private final String[] allIndices;
    private final String[] allOpenIndices;
    private final String[] allClosedIndices;

    private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;

可以看到,index、template、settings等等都包含在其中了,是不是和之前图中所示的大体差不多,那么我们是怎么拿到这个diff的呢。其实是有维护两个ClusterState实例,分别叫做state和previousState,通过diff方法来获得不同的部分。(要特别说明一下,若是即将发布的node是新加入的,即previousState中维护的nodes不包含改node,则依然是发送全量的state),diff的实现其实就是对state里面的每一项作比较:

最终归根到底是对map做diff:

代码很简单,不多做解释了。

接下来就是要将这些state发送到各个node上去了,首先会对state做压缩,然后利用transport将之下发下去。其中下发的控制有sendingController来完成,它会控制timeout、commit等。

MasterFaultDetection && NodesFaultDetection

MasterFaultDetection从代码上来看是所有node都会运行的,而NodesFaultDetection只运行在master node上。

ok,我们先来看看MasterFaultDetection:
启动一个线程去向master发送ping,

threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);

其中pingInterval由配置discovery.zen.fd.ping_interval来决定,默认的interval为1s。

当ping失败的时候,node会发一个rejoin的请求,请求重新加入集群;当master收到请求后,会回复一个TransportResponse.Empty的response,node收到后会再回复一个MasterPingResponseResponse给master,这样rejoin成功,master会将新的cluster state发布到集群的所有node。

再来看看NodesFaultDetection:
如果本node是master,而它又接收到来自其他node的再来看看NodesFaultDetection,那么说明集群里有两个master,一般称之为“脑裂”,遇到这种情况时,node会比较自己的version和另外master的version;如果比别人的小,就会stop掉自己的nodesFD和masterFD,然后rejoin进集群,开始joinThreadControl干的那些事儿。如果version比别人的大,那么当前node仍然是master,它会向另一个master发出一个指令,“hey,伙计,现在我才是大哥,你是伪造的,重新rejoin吧”,剩下的事情同样就是之前说的rejoin的事情了,在此不再重复。

总结

Discover的内容大体就是这么多,可以看到,这个模块主要的作用是维护这个集群的状态,并负责state的下发,master的选举,配置的更新等等,是elasticsearch中较为核心的一个模块了。


转载请注明出处:http://www.opscoder.info/es_discover.html

其他分类: