jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

谈谈ElasticSearch中的那些River Plugin

作者:jasper | 分类:ElasticSearch | 标签:           | 阅读 7569 次 | 发布:2015-03-14 12:20 a.m.

今天在网上浪荡的时候,无意中看到了Elasticsearch结合River JDBC的运用。之前也看到很多人在做这么一件事,把数据从之前的数据库里搬运到ES中,但是发现一般都是写脚本的,去官网上搜了一把,发现有很多的River Plugin可用,包括各种队列的,这里就简单介绍一下。

其实Elasticsearch本身官方只支持这四种River Plugin:

  • CouchDB River Plugin

  • RabbitMQ River Plugin

  • Twitter River Plugin

  • Wikipedia River Plugin

额,看着最后两个似乎没有什么实际的用途,但是强大的社区支持下,又有很多第三方的River Plugin被提供出来,不得不佩服开源界的力量啊,简单地列几个有用的吧:

  • FileSystem River Plugin

  • JDBC River Plugin

  • Kafka River Plugin

  • MongoDB River Plugin

  • Redis River Plugin

  • Solr River Plugin

……

你可以发现有了这些plugin,如果只是做一个数据的搬运,就可以不用logstash了。当然如果你还需要对数据做一些处理(比如logstash中的filter的那些功能),那你还是用logstash吧,因为我看了一下,这些plugin没有提供数据处理功能。

挑了两个看了一下他们的源代码,发现其实他们的原理很简单,就是在es中起了一些线程去帮忙做这种搬运的工作,然后利用es的java api,根据es相关的配置要求,将数据塞给es。 比如JDBC River,如果你用脚本和plugin,你的架构可能有这样的不同:

下面就简单地以Kafka和JDBC River说说他们的用法: 其实他们的安装就和其他es的plugin一样

bin/plugin --install xxx ……

因为他们都实现了es中的org.elasticsearch.plugins.AbstractPlugin接口。

Kafka River: 将Kafka River作为一个plugin发布到es中,需要执行:

curl -XPUT 'localhost:9200/_river/kafka-river/_meta' -d '
{
    "type" : "kafka",
    "kafka" : {
       "zookeeper.connect" : "localhost", 
       "zookeeper.connection.timeout.ms" : 10000,
       "topic" : "river",
       "message.type" : "json",
       "partition": 2
   },
   "index" : {
       "index" : "kafka-index",
       "type" : "status",
       "bulk.size" : 100,
       "concurrent.requests" : 1,
       "action.type" : "index"
    }
}'

哈哈,其实这样就可以了,是不是感觉和logstash差不多,就好像将logstash植入到es中了一样。

JDBC River: 和Kafka River不同的是,如果你只是做一次数据搬运:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
  "type" : "jdbc",
  "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "",
        "password" : "",
        "sql" : "select * from orders",
        "index" : "myindex",
        "type" : "mytype",
    ...          
    }
}'

注意这里的jdbc的key可以是一个list,这样就允许多个数据源了。

但是数据库不像是队列,如果要实时更新怎么办?这个plugin提供了一个schedule的功能,可以定时更新数据,比如下面的配置,会每十分钟load最新的和updated后的数据:

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
  "type" : "jdbc",
  "schedule" : "0 */10 * * * ?", 
  "jdbc" : {
        "url" : "jdbc:oracle:thin:@//{IP}:1521/{SERVICE}",
        "user" : "{USERNAME}",
        "password" : "{PASSWORD}",
        "sql" : [{
          "statement" : "{QUERY where created_on > ? or modified_on > ?}",
          "parameter" : [ "$river.state.last_active_begin", "$river.state.last_active_begin" ]
    }],
    "index" : "{INDEX}",
    "type" :  "{TYPE}"
  }
}'

感觉是不是一身轻松,可以大大减轻我们的工作量。

但是这个干还是有些事情是你要去考虑的:

  1. 前面已经说过了,不能对数据做额外的处理;
  2. 将数据的采集和es强耦合在了一起;
  3. 在大规模集群下的任务分配不好处理。

我现在还没有实践过,待我实践之后,再补充一些可能遇到的问题,但是至少给我们提供了一个不一样的处理方式,特别是对于一些数据量小,索引少,而又不须做数据处理的来说,是很便捷的,至少省去了logstash那一层。


转载请注明出处:http://www.opscoder.info/elasticsearch_river_plugin.html

其他分类: