jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

storm的生命周期

作者:jasper | 分类:storm | 标签:   | 阅读 1287 次 | 发布:2015-03-15 12:22 a.m.

这篇文章会从细节开始,讲解topology的生命周期,从运行“storm jar”命令开始,到从nimbus上下载topology,再到supervisor启动worker,再到worker和tasks的启动。也会解释nimbus是怎么监控topology,当topology被kill后怎么shtdown的。

首先关于topologies有很重要的几点: 1. 实际上run的topology不同于用户所指定的,真实的topology有隐式的流和隐式的“acker” bolt,用作去管理acking framework(常用来保证数据的处理)。这个隐式的topology是由system-topology!函数来创建的。 2. system-topology!常被用在下面两种情况:

  • 当nimbus为topology创建任务时;
  • 在worker中,让worker知道需要将消息传送到什么地方。

开始一个topology:

1. “storm jar”命令用特定的参数执行你的class。这个命令做的一个特别的事就是为之后用到的StormSubmitter设置环境变量。code 2,当你的代码执行StormSubmitter.submitTopology,StormSubmitter会做如下的动作:

  • 首先,如果之前没有下载,StormSubmitter会先下载jar包;code
  • jar包的下载是通过Nimbus的Thrift接口;code
  • beginFileUpload函数返回一个在Nimbus的inbox中的路径;
  • 通过uploadChunk,每次下载到达15千字节;
  • 当下载完成后,finishFileUpload 函数将会被调用;
  • 这里是那些Thrift方法在Nimbus中的实现;code
  • 接下来,StormSubmitter在Nimbus的thrift接口上调用submitTopology;code
  • topology的配置通过JSON序列化(JSON的运用,让用任何语言写DSL变得简单可行);
  • 注意到,Thrift submitTopology在jar被下载的Nimbus的inbox的路径上被调用;

3.Nimbus收到topology的提交code 4. Nimbus使topology的配置正常化。正常化最主要的目的是保证每一个单独的task有相同的序列化注册,这样对于使序列化正确进行至关重要。code

  1. Nimbus设置静态topology结构:code

    • Jar包和配置文件保存在本地文件系统中,因为它们对于Zookeeper来说太大了。Jar包和配置文件会被拷贝在目录 {nimbus local dir}/stormdist/{topology id}下
    • setup-storm-static写task-component的mapping到ZK中
    • setup-heartbeats在ZK中为tasks的heartbeat创建了一个目录

    6. Nimbus调用mk-assignment给机器分配tasks:code

    • 分配的记录在这里被定义:code
    • 分配包括:
      • master-code-dir:supervisor做它来从nimbus的机器上下载相应的jar和configs;
      • task->node+port:task id和运行task的worker的map;(一个worker被一个node:port所定义)
      • node->host:node id和hostname的map,这个被worker用作去知道当要和其他的worker通信是,该去在哪台机器上去找,node id被用作定义supervisor,因此多个worker可以运行在同一个机器上。
      • task->start-time-secs:包含task id到nimbus建立task的时间戳的map。这个在监控topology的时候会用到,因为tasks首次被建立的时候会给予一个长时间的timeout到hearbeat(这个timeout定义在"nimbus.task.launch.secs" 配置中)

    7.一旦topologies被分配后,他们就会被初始化为deactived模式。start-storm将数据写到zookeeper中,因此cluster知道topology是否active,可以开始从spouts emitting tuples:code

  2. Supervisor在background运行两个方法:

    • synchronize-supervisor:这个在Zookeeper中的分配改变厚实每隔10秒钟,都会被调用一次:code
      • 如果当前机器上没有code,会从Nimbus机器上下载分配的topologies:code
      • 将本个节点上期望运行的写到本地文件系统。它写一个port-LocalAssignment的map。LocalAssignment包含一个topology id和这个woker的tasks list。code
    • sync-processes:从synchronize-supervisor写入的LFS中读入并去和现在正在运行的相比较。然后是否同步来开启或是停止worker 进程.code
    9.worker进程通过mk-worker函数开始:code
    • worker相互联系,并开启一个线程去监控彼此的改变。因此,如果一个worker重分配了,worker会自动重连到新的worker的地址:code
    • 监控一个topology是否active,并将之存储在storm-active-atom变量中。这个变量可以被tasks使用去决定是否调用spouts上的nextTuple函数code
    • worker将上面的实际tasks创建为线程。code

  3. Tasks是通过mk-task函数来建立的:code
    • Tasks建立一个routing函数接受一个流和一个output tuple,并返回一个tuple将会被发送到的task id的listcode
    • Tasks创建code中的特定spout和bolt:code

Topology的监控:

Nimbus在它的生命周期中会监控topology

  • 在时钟线程上周期性地循环check topologies:code
  • Nimbus的表现被呈现为一个有限状态机code
  • "monitor" event在每一个 "nimbus.monitor.freq.secs"在topology上被调用,他通过reassign-transition调用reassign-topology来完成code
  • reassign-topology调用mk-assignments:相同的函数在分配topology时被首次调用, mk-assignments也能够递增地更新topology
    • mk-assignments checks heartbeats并且根据情况重分配workers;
    • 任何重分配在ZK中的改变,都会触发supervisor同步去开启或是停止workers

Kill一个topology:

1. “storm kill”命令运行code去调用Nimbus Thrift接口去kill topology:code 2. Nimbus接受一个kill命令:code 3.Nimbus将“kill”运用到topology:code 4. kill函数将topology的状态装换为“killed”,在“wait time seconds”之后运行“remove”:code

  • 这个wait time默认就是topology message的timeout,但是也可以被“storm kill”命令时的-w参数重写
  • topology的状态在实际上被shutdown之前的wait time里被置为deactived状态。这给了topology机会去完成在shut down之前正在运行的processing
  • 在kill期间改变状态的同时,要保证kill protocol是可容错的以防Nimbus crash。在startup的时候,如果topology的状态是“killed”,Nimbus调度在“wait time seconds”之后运行“remove”code

5.Remove一个topology并从ZK上清除分配和静态信息。code 6.另起一个线程运行do-cleanup函数,去清除heartbeat目录和存储在本地的jar包和配置文件:code

转载请注明出处:http://www.opscoder.info/storm_lifecycle.html

【上一篇】 谈谈ElasticSearch中的那些River Plugin
【下一篇】 没有了
其他分类: