jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

elasticsearch源码分析之java客户端

作者:jasper | 分类:ElasticSearch | 标签:   | 阅读 3471 次 | 发布:2015-11-22 11:44 p.m.

本文所说的java客户端主要只是讲解一下用于插数据的client的原理,我们都知道往ES发数据有三种protocol分别是node、http和transport;其实对于其他client而言最终都是使用的http;而java是可以使用node和transport的,node方式一般很少用,所以我们只探究transport client,那么我们且来看看吧。

发送端例子

对于java client的数据发送(这里以bulk为例),写过的人都知道,其实是很简单的,因为大部分事情都已经被client做掉了,那么我们先给出例子感知一下:

client初始化

Settings settings = Settings.settingsBuilder()
            .put("cluster.name", "myClusterName")
            .put("client.transport.sniff", true).build();
client=new TransportClient.builder().settings(settings).build()
    .addTransportAddress(new InetSocketTransportAddress("host1",9300))
    .addTransportAddress(new InetSocketTransportAddress("host2",9300));

bulk数据发送

对于数据的发送ES提供了两种方式:

第一种bulk api:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

可以看到这种方式是由client端自己添加数据,然后调用BulkResponse bulkResponse = bulkRequest.get();来完成数据的发送。

第二种叫做Bulk Processor:

import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .build();

初始化bulk processor之后,客户端只需要往bulkProcessor添加数据即可bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));,你可以先配置好bulk的size、interval等,其他的事情就交给processor自己去做吧。

两种方式各有利弊,第一种要自己控制bulk size和interval,但是有利于对发送失败的处理;而第二种简单易用,用户只管add数据就好,但是对于使用回调函数来处理异常会不那么方便,如何选择就看使用场景的了。

下来才来正式分析client的原理:

client初始化

初始化的逻辑有点类似于之前一遍说的ES启动过程,也是用过Guice来依赖注入的,添加了一些有用的模块: 最后一句才是实例化这个client,那我们来看看实例化的逻辑:

    super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
    this.injector = injector;
    nodesService = injector.getInstance(TransportClientNodesService.class);
    proxy = injector.getInstance(TransportProxyClient.class);

查看了一下,对于client的一些调用其实最后都追溯到了TransportClientNodesService中,这个service负责客户端和服务端的连接,所以我们来重点看看。

client与server的连接

在TransportClientNodesService中维护了三个node list:

  • listedNodes:client初始化时addTransportAddress配置的node;
  • nodes:和服务端建立的node;
  • filteredNodes:没有建立连接的node;

在client调用addTransportAddress时,就会往listedNodes中添加配置的node,之后会调用nodesSampler.sample();这里补充一下nodesSampler实例会根据client.transport.sniff的配置来决定使用哪种方式,分别是SniffNodesSampler和SimpleNodeSampler;

我们先来看看SniffNodesSampler,这意味着client会主动发现集群里的其他节点;在sample中client会去ping listedNodes和nodes中所有节点,默认ping的interval为5s;如果ping的node在nodes list里面,意味着是要真正建立连接的node,则创建fully connct;如果不在则创建light connect。(这里的light是指只创建ping连接,fully则会创建前一篇文章所说的那五种连接)。然后对这些node发送一个获取其state的请求,获取集群所有的dataNodes,对这些nodes经过再次确认后就放入nodes中。

再来看看SimpleNodeSampler咯;同样是ping listedNodes中的所有node,区别在于这里创建的都是light connect。对这些node发送一个TransportLivenessAction的请求,这个请求会返回一个自发现的node info,把这个返回结果中真实的node加入nodes,如果返回时空,仍然会加入nodes,因为可能目标node还没有完成初始化还获取不到信息。

可以看到两者的最大不同之处在于nodes列表里面的node,也就是SimpleNodeSampler让集群中的某些个配置的节点,专门用于接受用户请求。SniffNodesSampler的话,所有节点都会参与负载。

那么发送数据的时候会选择哪一个呢?其实都在execute方法里面: 其实关键的就一句话DiscoveryNode node = nodes.get((index) % nodes.size());通过Round robin来生成发送的node,以达到负载均衡的效果。

bulkRequest

先说说上面的第一种方式,通过prepareBulk()创建了一个bulkRequest实例,add的时候实际上是添加了一个IndexRequest的实例,添加的这些request都放在了一个list里面。当执行get()的时候,其实最终是落到了doExecute()上,其会用其代理中的execute,这里面: 就是调用了我上面说的获得发送node的方法,获取node之后将请求发过去:

    transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
        @Override
        public Response newInstance() {
            return action.newResponse();
        }
    });

Bulk Processor

Bulk Processor的数据发送逻辑和上面的bulkRequest是一样的,最后都落在了doExecute()上,所以这里就不重复了,就说说是怎么确定什么时候发送的。如果配置了bulk interval,那么会启动一个特定的scheduler,这里叫做scheduleWithFixedDelay,即使通过传入的interval来触发,时间到了就发送。另外的bulk size和bulk size是在每次add之后就会做一次判断: 满足条件则发送。

总结

整个client大体就是这么些内容,我这边只是以bulk来说明的。对于server端收到数据后会做怎么样的事情,我们下一篇再讲。

好累,睡觉~~~


转载请注明出处:http://www.opscoder.info/es_javaclient.html

其他分类: