jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

从一个storm的序列化错误谈谈component的生命周期

作者:jasper | 分类:storm | 标签:   | 阅读 1483 次 | 发布:2015-03-08 12:18 a.m.

最近搞了搞storm,别问我为什么,我就是什么都会去插一脚的,storm是twitter开源出来的一个流式处理框架,我们用它来实时处理日志,比如做一些计数啊,平均啊,聚合一类的计算。关于其基础知识我就不多说了,网上一大把一大把的,下面我就说说在用的时候踩到的一个坑。

在我开发bolt,我们有时候会需要引用外部的类(比如Test),我们一般会去这么写:

@SuppressWarnings("serial")public class SimpleBolt extends BaseBasicBolt {
    private Test t;
  public SimpleBolt(){   
      this.t = new Test();
   }
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
       // maybe some config
    }
    public void execute(Tuple input, BasicOutputCollector collector) {
        try {
            String msg = input.getString(0);
            if (msg != null){
                // some calculate
                collector.emit(new Values(xxx));
            }

        } catch (Exception e) {
            e.printStackTrace(); 
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(yyy));
    }
}

在运行的时候,发现一直报Test的java.io.NotSerializableException;于是我做了以下一些试验:

  1. 将 this.t = new Test();实例化的代码放入prepare中,结果是: 成功;
  2. 让Test类可以被序列化,即实现Serialize接口,结果是: 成功;
  3. 将t的声明和实例化都放在构造函数中,其结果是: 成功;

为什么在storm中会牵扯到序列化呢,带着疑问我查看我一把storm的源代码,在在一开始createTopology中会对spout和bolt进行序列化:

public StormTopology createTopology() {
        Map boltSpecs = new HashMap();
        Map spoutSpecs = new HashMap();
        for(String boltId: _bolts.keySet()) {
            IRichBolt bolt = _bolts.get(boltId);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));
        }
        for(String spoutId: _spouts.keySet()) {
            IRichSpout spout = _spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

        }
        return new StormTopology(spoutSpecs,
                                 boltSpecs,
                                 new HashMap());
    }

既然有了序列化,那么就必然会有反序列化的代码,于是我找了一下,最后在task中找到了get-task-object方法,在其中代用了Utils/getSetComponentObject方法:

(defn- get-task-object [^TopologyContext topology component-id]
  (let [spouts (.get_spouts topology)
        bolts (.get_bolts topology)
        state-spouts (.get_state_spouts topology)
        obj (Utils/getSetComponentObject
             (cond
              (contains? spouts component-id) (.get_spout_object ^SpoutSpec (get spouts component-id))
              (contains? bolts component-id) (.get_bolt_object ^Bolt (get bolts component-id))
              (contains? state-spouts component-id) (.get_state_spout_object ^StateSpoutSpec (get state-spouts component-id))
              true (throw-runtime "Could not find " component-id " in " topology)))
        obj (if (instance? ShellComponent obj)
              (if (contains? spouts component-id)
                (ShellSpout. obj)
                (ShellBolt. obj))
              obj )
        obj (if (instance? JavaObject obj)
              (thrift/instantiate-java-object obj)
              obj )]
    obj
    ))

而这个Utils/getSetComponentObject方法中,会对component进行反序列化的操作:

public static Object getSetComponentObject(ComponentObject obj) {
        if(obj.getSetField()==ComponentObject._Fields.SERIALIZED_JAVA) {
            return Utils.deserialize(obj.get_serialized_java());
        } else if(obj.getSetField()==ComponentObject._Fields.JAVA_OBJECT) {
            return obj.get_java_object();
        } else {
            return obj.get_shell();
        }
    }

因此这就说明,在storm的task的下发中,会有对component的序列化和反序列化的操作,这样就大致可以描述出component的生命周期了:

  1. 在提交了一个topology之后(在nimbus所在的机器), 创建spout/bolt实例并进行序列化
  2. 将序列化的component发送给所有的任务所在的机器(supervisor)
  3. 在每一个任务上反序列化component.
  4. 在开始执行任务之前, 先执行component的初始化方法(bolt是prepare, spout是open).

supervisor实例化bolts,然后把它们发给worker,接着调用prepare()方法,因此,任何不能序列化的类在prepare()之前实例化都会出错。因此storm官方建议,component的初始化操作应该在prepare/open方法中进行, 而不是在实例化component的时候进行。


转载请注明出处:http://www.opscoder.info/storm_serialize.html

其他分类: