jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之Allocation模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 2627 次 | 发布:2016-01-02 9:50 p.m.

对于shard分配到哪个node,之前也提到过了,是由ES自己控制或是由routing来控制,但是在一些情况下会有shard的分配动作,ES中将之称之为allocate,下面我们就来探讨一下allocate相关的知识。

那么在哪些情况下会有shard的allocate的动作呢?一般说来会有下面几个导致:

  1. index的增减;
  2. node的增减;
  3. 执行了reroute操作;
  4. replica的设置更改;

那么allocate的策略是怎样的呢?allocate的过程又是怎么样的呢,下面一一道来;

AllocationDecider

实际上,allocate的策略是有些复杂的,在ES中使用一个叫做AllocationDecider的模块来控制,其中描述decider的类型有四种,分别是:

public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
public static final Decision NO = new Single(Type.NO);
public static final Decision THROTTLE = new Single(Type.THROTTLE);

看到单词也就明白所表达的意思了,再次不再赘述,而这个AllocationDecider分为很多种类,也有不同的配置参数,这些AllocationDecider包括下面几个:

public static final List<Class<? extends AllocationDecider>> DEFAULT_ALLOCATION_DECIDERS =
    Collections.unmodifiableList(Arrays.asList(
        SameShardAllocationDecider.class,
        FilterAllocationDecider.class,
        ReplicaAfterPrimaryActiveAllocationDecider.class,
        ThrottlingAllocationDecider.class,
        RebalanceOnlyWhenActiveAllocationDecider.class,
        ClusterRebalanceAllocationDecider.class,
        ConcurrentRebalanceAllocationDecider.class,
        EnableAllocationDecider.class, 
        DisableAllocationDecider.class,
        AwarenessAllocationDecider.class,
        ShardsLimitAllocationDecider.class,
        NodeVersionAllocationDecider.class,
        DiskThresholdDecider.class,
        SnapshotInProgressAllocationDecider.class));

这些decider都是继承自AllocationDecider,该base class包含以下几种方法:(默认都是ALWAYS)

  • canRebalance:给定的shard routing是否可以rebalance;
  • canAllocate:给定的shard routing是否可以分配到指定的node;
  • canRemain:给定的shard routing是否可以继续保留在指定的node;

那么现在就来依次看看上面说到的各种decider:

SameShardAllocationDecider:防止同一个shard的多个实例(也即primary和replica)被分配到了相同的node上,判断是否是相同的node使用的是node的host name和host address;通过配置cluster.routing.allocation.same_shard.host来决定是否允许做这个check,默认是false,注意这个设置只有在同一个host上运行多个ES实例时有效,而同一node上的相同shard的多个replica不被允许被分配是独立于这个配置的。

FilterAllocationDecider:明确地设置shard可以或是不能分配到此node上,通过可以运行时动态更改的配置:

  • index.routing.allocation.require.*
  • index.routing.allocation.include.*
  • index.routing.allocation.exclude.*
  • cluster.routing.allocation.require.*
  • cluster.routing.allocation.include.*
  • cluster.routing.allocation.exclude.*

来决定,其中require表示必须,include表示允许,exclude表示禁止。注意Cluster的设置会重载掉index的配置,意味着如果根据index的配置该shard可以分配到此node,但是cluster的配置是不允许,那么此shard将不允许。filter被应用的顺序依次为required、include、exclude。

ReplicaAfterPrimaryActiveAllocationDecider:只有在主分片active状态下,副本才可以分配。

ThrottlingAllocationDecider:这个主要生效在recovery过程中,主要由下面的可以通过api来更改的配置来决定:

  • cluster.routing.allocation.node_initial_primaries_recoveries:在recovery时,同一个node上允许同时恢复几个主分片个数,默认是 4 个;
  • cluster.routing.allocation.node_concurrent_recoveries:限制在同一个node上,除了主分片重启恢复以外其他shard做recovery的并发数,默认是 2 个;
  • cluster.routing.allocation.concurrent_recoveries:和上一条配置意思差不多,但是当有次配置时会覆盖上一条的配置;

如果上面的阀值有一个被超过了,该allocation decider会返回THROTTLE,用以限制allocate的过程,以阻止太多的recovery京城导致node过载;

RebalanceOnlyWhenActiveAllocationDecider:在所有shard都处在active状态下,才可以执行rebalance操作;

ClusterRebalanceAllocationDecider:通过集群中active的shard状态来决定是否可以执行rebalance,通过可运行时修改的配置cluster.routing.allocation.allow_rebalance来决定,此配置有三个可选的value,分别是:

  • indices_primaries_active:只有所有的primary的shard都是active状态是才允许rebalance;
  • indices_all_active:只有所有的shard都是active状态是才允许rebalance;
  • always:无条件允许做rebalance;

默认值是indices_all_active。

ConcurrentRebalanceAllocationDecider:通过并发数来控制rebalance,配置cluster.routing.allocation.cluster_concurrent_rebalance来控制,该配置运行时可变,默认值为2,如果设置为-1,则表示无限制并发;

EnableAllocationDecider:是否允许分配。可以在cluster级别设置,也可以在index级别设置,index级别的设置会覆盖cluster级别的设置:

cluster.routing.allocation.enable、index.routing.allocation.enable、cluster.routing.rebalance.enable、index.routing.rebalance.enable

可能的值有四个,分别为NONE、NEW_PRIMARIES(allocation特有)、REPLICAS(rebalance特有)、PRIMARIES、ALL,设置不区别大小写,默认值都为ALL;

DisableAllocationDecider:该策略已经被标示为Deprecated,所以不做解释;

AwarenessAllocationDecider:通过有关属性awareness的键值对的配置来控制allocate,Awareness基于像node或是物理机架的位置显式地控制replica的allocate,Awareness属性接收任意配置的key,举个例子来说吧,比如你配置了cluster.routing.allocation.awareness.attributes: rack_id,意味着你希望一个shard的多个replica都分布在rack_id相同的node上,所以你还需要有这样的配置如node.rack_id:1。这里还有一个配置cluster.routing.allocation.awareness.force.*,接着上面的例子配置一个cluster.routing.allocation.awareness.force.rack_id.values: 1,2;意思是rack_id为1和2的都可以被分配到同样的replica。注意在正确的设置状态下,force中设置的对应值应该包含在attributes的值域中。

ShardsLimitAllocationDecider:限制一个节点上允许的同一个index的shard数目。设置项同样有cluster和index级别的两种:index.routing.allocation.total_shards_per_node、cluster.routing.allocation.total_shards_per_node,同样index可覆盖cluster,默认值为null,意味着不限制,如果设置为负数也是不限制。注意如果动态减小这个配置,会触发数据迁移和对集群节点产生额外的负载。

NodeVersionAllocationDecider:根据节点的版本新旧来决定是否allocate,如果target node的版本等于或是大于source的版本,则允许,否则就不允许。一般情况下一个集群中是不会出现不同版本共存的情况的,但是在做ES逐个升级的时候,就可能出现。

DiskThresholdDecider:根据硬盘剩余空间来决定是否继续分配,涉及的配置项有:

  • cluster.routing.allocation.disk.threshold_enabled:是否开启disk阀值策略,默认然是true(代码里comment里面写的是false,代码里是true,看来是作者大意了);
  • cluster.routing.allocation.disk.watermark.low:达到这个值(默认为85%),新索引分片就不会再分配到这个节点上了,也可以设置具体的byte大小;
  • cluster.routing.allocation.disk.watermark.high:达到这个值(默认为90%),会触发该节点现存分片的数据rebalance,把数据挪到其他节点上去,也可以设置具体的byte大小;

其实ES是每隔cluster.info.update.interval都会去探测一次node的size,默认interval是30s。当分别达到设置值时也会有不同的动作,如果达到low但不到high,replica将分配不到该node,如果达到了high,shard将会被移出当前node。

这里还配置需要说明一下:cluster.routing.allocation.disk.include_relocations:在计算disk的时候是否将要relocation到此node的包括在内,默认是true。cluster.routing.allocation.disk.reroute_interval:配置当超过阀值时reroute的间隔,默认是1min。

SnapshotInProgressAllocationDecider:控制snapshot期间是否允许分配。配置项为cluster.routing.allocation.snapshot.relocation_enabled,默认值是false,这里需要说明一下,因为snapshot只针对primary,所以这里的限制也只限于primary,对于replica无效。

所有的AllocationDecider就这么多,当发生allocate的时候会根据这些decider来分配到相应的node,那么我们接下来看看allocate的过程。

allocate过程

allocate的触发来自RoutingService中的clusterChanged中,当然也不是一有触发就会立即执行,这里有一个配置叫做index.unassigned.node_left.delayed_timeout决定由于node离开集群unassigned的shard需要延时多久才分配,默认是1min,如果你想立即分配可以将之设为0;这样的设计是为了避免一个节点离开集群后又很快加入,而带来的网络消耗。所以当有unassigned产生时,都会记下当前的时间作为unassignedShardsAllocatedTimestamp,然后在allocate的时候计算一下delay(即是超过了上面配置的时间),只要有delay的shard,就会执行reroute("assign delayed unassigned shards");操作。还得现在没有正在执行的reroute,才会执行。

真正的操作来自于AllocationService的reroute中,步骤如下:

1、 首先要清除掉要allocate之前所在的node,当然如果现在发现其活过来了的话,就不清除;清除的时候会遍历其上的所有shard,对这些shard做一个applyFailedShard,其实判断这些shard是否需要relocation,在该shard处于initializing状态,或是这个shard本来就是当前routing shard的一个copy时(这时会把shard重新置为unassigned),将会取消relocation:routingNodes.cancelRelocation(shardRouting);;

2、 其次将新的node加入到routingNodes中,这个是在有node加入时会执行的:applyNewNodes;

3、 在分配之前会选举出primary,如果是primary failed,会从其replica中选出一个作为primary:electPrimariesAndUnassignedDanglingReplicas;大体逻辑是随机获取一个作为候选shard:

for (ShardRouting shardRouting : assignedShards(shard.shardId())) {
    if (!shardRouting.primary() && shardRouting.active()) {
        return shardRouting;
    }
}

只要是不是primary且active的都行,然后分别使用shard.moveToPrimary();来使之为primary,

4、 现在才是将unassigned的shard分配到可用的node上,这里呢又分为两种allocation,分别是gateway的和真正数据的。

对于Gateway的allocate(上一篇已经埋好伏笔说这一篇来讲的),分为primaryShardAllocatorreplicaShardAllocator:

changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation, lastAllocateUnassignedRun);

primaryShardAllocator中通过buildNodesToAllocate来获取所有可能allocate到的node,当然是利用了deciders:

Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
    throttledNodes.add(discoNode);
} else if (decision.type() == Decision.Type.NO) {
    noNodes.add(discoNode);
} else {
    yesNodes.add(discoNode);
}

然后会选其中的第一个作为target:DiscoveryNode node = nodesToAllocate.noNodes.get(0);,再initialize当前shard,并从unassigned列表中移除;具体的initialize过程这里不讲,下一篇再说;

replicaShardAllocator则需要首先找到primary上的数据,只有primary上的数据不为空或是没有被损坏,才会进行allocate操作,然后需要确定要分配到的node,当然也会check decision,但是这里只会对THROTTLE做check,确定之后同样是initialize过程。但是如果我们没有找到分配的node,那么会先判断这个replica的shard是否需要delay,如果需要就要忽略。

gateway的allocate结束之后,则是数据的allocate过程,对于shard的allocate是有一定的顺序的,首先是要对于Priority比较高的做(通过index.priority来配置),如果priority相同,则要根据index create的先后(index创建的时候会给该index附一个index.creation_date的metadata),如果仍然相同,就得根据index name的字母的逆序了。然后是根据decision确定一个node,然后就是initialize这个shard:routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));

5、通过deciders来判断shard是否要继续保留在该shard上,这个check只针对状态为started的shard,如果不需要保留,则remove到另外的node:shardsAllocators.move(shardRouting, routingNode, allocation);

6、最后一步rebalance:在rebalance之前要确保当前已经没有进行中的allocate。rebalance其实是从负载高的node向负载低的做转移。

至此,allocate的过程大体完成。

总结

这一节总算是看完了,虽然有些细节之处并未说得很清楚,小生惭愧,的确是有些代码尚未看懂,所以不敢妄言。内容实在是多,那我们就在下一节再来说说recovery的过程。


转载请注明出处:http://www.opscoder.info/es_allocation.html

其他分类: