大数据(Storm)-WordCount

xiaoxiao2021-02-28  16

WordCountSplot

public class WCSpout extends BaseRichSpout{ private SpoutOutputCollector collector; private String[] lines = { "this is first line", "this is second line", "this is third line" }; @Override public void nextTuple() { Random random = new Random(); System.out.println("aa"); collector.emit(new Values(lines[random.nextInt(lines.length)])); } @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }

WordCountSplitBlot

public class WCSplitBlot extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple tuple) { String line = tuple.getStringByField("line"); String[] words = line.split(" "); for(String word : words) { collector.emit(new Values(word, 1)); } } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }

WordCountCountBlot

public class WCCountBlot extends BaseRichBolt{ private OutputCollector collector; private Map<String, Integer> map = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Integer count = tuple.getInteger(tuple.getIntegerByField("count")); if(!map.containsKey(word)) { map.put(word, count); }else{ Integer currentCount = map.get(word); map.put(word, currentCount + count); } System.out.println(map); } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //最终总计数量, 不输出结果 } }

MainTopology

public class WCTopology { public static void main( String[] args ) throws Exception { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("mySpout",new WCSpout(),2); topologyBuilder.setBolt("mybolt1",new WCSplitBlot(),2).shuffleGrouping("mySpout"); topologyBuilder.setBolt("mybolt2",new WCCountBlot(),4).fieldsGrouping("mybolt1", new Fields("word")); Config config = new Config(); config.setNumWorkers(2); //StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology()); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology()); } }
转载请注明原文地址: https://www.6miu.com/read-2626487.html

最新回复(0)