IRichBolt和IBasicBoltBaseBasicBolt对比

xiaoxiao2021-02-28  16

storm消息的可靠处理 IRichBolt和IBasicBolt/BaseBasicBolt对比 使用IBasicBolt/BaseBasicBolt不需要总是调用collect.ack,storm会帮我们处理。

对于spout,有ISpout,IRichSpout,BaseRichSpout

对于bolt,有IBolt,IRichBolt,BaseRichBolt,IBasicBolt,BaseBasicBolt

IBasicBolt,BaseBasicBolt不用每次execute完成都写ack/fail,因为已经帮你实现好了。

 作 为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。   

Java代码  

public class SplitSentence implements IRichBolt {   

            Output Collector _collector;   

        

            public void prepare(Map conf,   

                                TopologyContext context,   

                                OutputCollector collector) {   

                _collector = collector;   

            }   

        

            public void execute(Tuple tuple) {   

                String sentence = tuple.getString(0);   

                for(String word: sentence.split(" ")) {   

                    _collector.emit(tuple,newValues(word));   

                }   

                _collector.ack(tuple);   

            }   

        

            public void cleanup() {   

            }   

        

            public void declareOutputFields(OutputFieldsDeclarer declarer) {   

                declarer.declare(newFields("word"));   

            }   

        }    

  我 们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple的时候告诉storm,  通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子, 你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。你可以调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail, 比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple, 这样可以让这个tuple被快速的重新处理, 因为你不需要等那个timeout时间来让它自动fail。每个你处理的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple, 那么最终你会看到OutOfMemory错误。大 多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些 过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以改写成这样:   

Java代码  

public class SplitSentence implements IBasicBolt {   

            public void prepare(Map conf,   

                                TopologyContext context) {   

            }   

        

            public void execute(Tuple tuple,   

                                BasicOutputCollector collector) {   

                String sentence = tuple.getString(0);   

                for(String word: sentence.split(" ")) {   

                    collector.emit(newValues(word));   

                }   

            }   

        

            public void cleanup() {   

            }   

        

            public void declareOutputFields(   

                            OutputFieldsDeclarer declarer) {   

                declarer.declare(newFields("word"));   

            }   

        }    

 这个实现比之前的实现简单多了, 但是功能上是一样的。

发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。我们编写的时候使用IBasicBolt最方便了。或者 extends BaseBasicBolt类

转载请注明原文地址: https://www.6miu.com/read-1650213.html

最新回复(0)