1、什么是Storm
2、Storm的一些概念
3、Storm的安装
1、安装一个zookeeper集群 2、上传storm的安装包,解压 3、修改配置文件storm.yaml #所使用的zookeeper集群主机 storm.zookeeper.servers: - "Slave5" - "Slave6" - "Slave7" #nimbus所在的主机名 nimbus.host: "Slave5" 启动Storm 在nimbus主机上启动storm 进入到storm的bin目录下启动storm ./storm nimbus 在supervisor主机上启动supervisor ./storm supervisor 启动storm ui,通过http://Slave5:8080可进入到storm ui页面 ./storm ui相关配置(可选)
4、storm常用命令 5、Storm API调用示例(上面那个将数据转换大写、加后缀案例)
所运用到的包都是Storm lib目录下的包,这是Hadoop等一些工具API调用的一个通性,在集群上运行时注意我们编写的Java程序的jdk版本要与Hadoop对应。
RandomWordSpout.java
package cn.itcast.stormdemo; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ private SpoutOutputCollector collector; //模拟一些数据 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不断地往下一个组件发送tuple消息 //这里面是该spout组件的核心逻辑 @Override public void nextTuple() { //可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品发送出去 Random random = new Random(); int index = random.nextInt(words.length); //ͨ通过随机数拿到一个商品 String godName = words[index]; //将商品名封装成tuple,发送消息给下一个组件 collector.emit(new Values(godName)); //每发送一个消息,休眠500ms Utils.sleep(500); } //初始化方法,在spout组件实例化时调用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //声明本spout组件发送出去的tuple中的数据的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }UpperBolt.java
package cn.itcast.stormdemo; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class UpperBolt extends BaseBasicBolt{ //业务逻辑处理 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先获取到上一个组件传递过来的数据,数据在tuple里面 String godName = tuple.getString(0); //将商品名转成大写 String godName_upper = godName.toUpperCase(); //将转换完成的商品名发送出去 collector.emit(new Values(godName_upper)); } //声明该bolt组件要发出去的tuple的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uppername")); } }TopMain.java
package cn.itcast.stormdemo; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; /** * * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job,job处理完了就结束了,而topology是阻塞的) * 并且将该topology提交给storm集群运行,topology提交到集群后就将永远无休止地运行,除非人为或者异常退出 * * */ public class TopoMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //将我们的spout组件设置到topology中去 //parallelism_hint:4表示用4个executor线程来执行这个组件 //setNumTasks(8) 设置的是该组件执行时并发task数量,也就意味着1个executor会运行2个task builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); //将大写转换的bolt组件设置到topology,并指定它接受randomspout组件的消息 //.shuffleGrouping("randomspout")包含两层含义: //1、upperbolt组件接收的tuple消息一定来自于randomspout组件 //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组的shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //将添加后缀的bolt组件设置到topology,并且指定它接受upperbolt组件的消息 builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //用builder来创建一个topology StormTopology demotop = builder.createTopology(); //配置一些topology在集群中运行时的参数 Config conf = new Config(); //这里设置的是整个demotop所占用的槽位数,也就是worker进程的数量 conf.setNumWorkers(4); //设置日志级别 conf.setDebug(true); conf.setNumAckers(0); //将这个topology提交给storm集群运行 StormSubmitter.submitTopology("demotopo", conf, demotop); } }在集群上运行程序
./storm jar /home/hadoop/app/storm.jar cn.itcast.stormdemo.TopoMain6、Topology运行机制
7、storm记录级容错原理 首先来看一下什么叫做记录级容错?storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每一个消息单元是否在指定时间内被完全处理了。那什么叫做完全处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple经过了topology中每一个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2经过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被完全处理了。 图4-1 在storm的topology中有一个系统级组件,叫做acker。这个acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径,如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消息处理失败了,相反则会告知spout该消息处理成功了。在刚才的描述中,我们提到了”记录tuple的处理路径”,如果曾经尝试过这么做的同学可以仔细地思考一下这件事的复杂程度。但是storm中却是使用了一种非常巧妙的方法做到了。在说明这个方法之前,我们来复习一个数学定理。 A xor A = 0. A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。 storm中使用的巧妙方法就是基于这个定理。具体过程是这样的:在spout中系统会为用户指定的message id生成一个对应的64位整数,作为一个root id。root id会传递给acker及后续的bolt作为该消息单元的唯一标识。同时无论是spout还是bolt每次新生成一个tuple的时候,都会赋予该tuple一个64位的整数的id。Spout发射完某个message id对应的源tuple之后,会告知acker自己发射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一个输入tuple处理完之后,也会告知acker自己处理的输入tuple的id及新生成的那些tuple的id。Acker只需要对这些id做一个简单的异或运算,就能判断出该root id对应的消息单元是否处理完成了。下面通过一个图示来说明这个过程。 图4-1 spout中绑定message 1生成了两个源tuple,id分别是0010和1011. 图4-2 bolt1处理tuple 0010时生成了一个新的tuple,id为0110. 图4-3 bolt2处理tuple 1011时生成了一个新的tuple,id为0111. 图4-4 bolt3中接收到tuple 0110和tuple 0111,没有生成新的tuple. 你可能会发现,容错过程存在一个可能出错的地方,那就是,如果生成的tuple id并不是完全各异的,acker可能会在消息单元完全处理完成之前就错误的计算为0。这个错误在理论上的确是存在的,但是在实际中其概率是极低极低的,完全可以忽略。
