jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之ThreadPool模块

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 4043 次 | 发布:2015-12-12 9:31 p.m.

在ES中所有的action都是由一些线程去执行的(额,感觉说了一句废话),当然多线程的话就需要线程池来控制,其线程的实现与控制就是有threadpool模块来决定,其实之前的很多模块里面都有用到threadpool,那么本文就来详细地说一说ES的线程池模块——ThreadPool。

ES中用到的threadpool

介绍这一块之前,先来申明几个配置:

int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);

其中availableProcessors的默认值为Math.min(32, Runtime.getRuntime().availableProcessors());即jvm可用进程数和32之前的最小值,当然也可以通过配置processors来设置这个值。

再来申明下threadpool的类型:

  • CACHED:没有限制的threadpool,当有pending的请求时就会产生一个线程,线程池中没有使用的线程会在过期时间结束后消亡,这里的过期时间默认为5min,当然也可以自行设置,CACHE类型是专门为GENERIC(下面会说到这个)而设置的。
  • FIXED:大小固定设置的threadpool,它有一个queue来存放pending的请求,其中pool的大小默认是core*5,queue_size默认是-1(即是无限制);
  • SCALING:拥有可变大小的pool,其值可在1和设置值之间;

ES中用到的threadpool,我们也根据上面的类型来划分:

CACHED

  • GENERIC:通用的操作,比如node的discovery,上面也说了,默认keep alive时间是5min;

FIXED

  • LISTENER:主要用作java client的执行,默认大小halfProcMaxAt10;
  • GET:用作get操作,默认大小availableProcessors,queue_size为1000;
  • INDEX:用作index或delete操作,默认大小availableProcessors,queue_size为200;
  • BULK:用作bulk操作,默认大小为availableProcessors,queue_size为50;
  • SEARCH:用作count或是search操作,默认大小((availableProcessors * 3) / 2) + 1;queue_size为1000;
  • SUGGEST:用作suggest操作,默认大小availableProcessors,queue_size为1000;
  • PERCOLATE:用作percolate,默认大小为availableProcessors,queue_size为1000;
  • FORCE_MERGE:用作force_merge操作(2.1之前叫做optimize),默认大小为1;

SCALING

  • MANAGEMENT:用作ES的管理,比如集群的管理;默认大小5,keep alive时间为5min;
  • FLUSH:用作flush操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • REFRESH:用作refresh操作,默认大小为halfProcMaxAt10,keep alive时间为5min;
  • WARMER:用作index warm-up操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • SNAPSHOT:用作snapshot操作,默认大小为halfProcMaxAt5,keep alive时间为5min;
  • FETCH_SHARD_STARTED:用作fetch shard开始操作,默认大小availableProcessors * 2,keep alive时间为5min;
  • FETCH_SHARD_STORE:用作fetch shard存储操作,默认大小availableProcessors * 2,keep alive时间为5min;

注意:对于上面的默认配置,虽然都允许用户自行设置,但是ES官方并不建议这么做,因为他们认为默认的已经是最合理的了,尽量减少上下文切换。详情请戳:https://www.elastic.co/guide/en/elasticsearch/guide/current/_don_8217_t_touch_these_settings.html#_threadpools

其实ES也是允许用户自己定义线程池:

// Building custom thread pools
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
    if (executors.containsKey(entry.getKey())) {
        continue;
    }
    executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY));
}

Executor的生成

Executor的生成的结果其实是一个EsThreadPoolExecutor的实例,归根到底呢,还是java的并发库java.util.concurrent.ThreadPoolExecutor;中的ThreadPoolExecutor实例,看看怎么生成的吧。

根据不同类型的threadpool,有不同的处理方式,其实就是加载不用的配置而已:

对于CACHED,

public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
}

可以看到第二个参数corePoolSize即pool中保持的线程数为0,第三个参数maximumPoolSize即pool中的最大线程数设置就可以看做无限制的,就像之前说的是有pending的请求时就会产生一个线程,且keep alive根据设置来的。它的queue是个同步的queue,也就是一个insert操作一定会对应一个remove操作,反过来也一样,你也可把它当做一个没有容量的queue。对于reject(其实理论上不会触发,因为pool size没有限制)的处理是EsAbortPolicy,即是会将之放入queue中,如果插入失败当前线程会interrupt,然后也会触发异常IllegalStateException

对于FIXED,

public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) {
    BlockingQueue<Runnable> queue;
    if (queueCapacity < 0) {
        queue = ConcurrentCollections.newBlockingQueue();
    } else {
        queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
    }
    return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
}

当queue的size小于0时,其实也就是没有设置时,会实例化一个无限制的queue,不然就按queue_size来创建,,创建的是block的queue,其中corePoolSize和maximumPoolSize都是相等的,且keep alive时间为0。对于reject的处理同样是上面的EsAbortPolicy。

对于SCALING,

public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
    // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
    EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy());
    queue.executor = executor;
    return executor;
}

这里创建的是一个无限制的queue,并且pool size的大小在最小最大值之间浮动。对于reject的处理是ForceQueuePolicy,即是将之放入queue中,这里的queue是无限制的。

threadpool的状态查看

对于threadpool的状态信息,主要包含下面几个:

threads = threadPoolExecutor.getPoolSize();
queue = threadPoolExecutor.getQueue().size();
active = threadPoolExecutor.getActiveCount();
largest = threadPoolExecutor.getLargestPoolSize();
completed = threadPoolExecutor.getCompletedTaskCount();
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();

我们可以通过cat pool thread api来查看相应的信息。

总结

ES的threadpool其实就是对java自带的threadpool的一层封装,其内部的实现细节大家可以参考infoq上面的这篇文章JAVA线程池的分析和使用。要了解每个不同的动作是用什么类型的threadpool来处理的,以及具体的配置是怎么样的。

另外,虽然ES提供了这些threadpool相关的配置,但是官方并不建议修改这些参数,详情请戳:https://www.elastic.co/guide/en/elasticsearch/guide/current/_don_8217_t_touch_these_settings.html#_threadpools


转载请注明出处:http://www.opscoder.info/es_threadpool.html

其他分类: