WordCountSplot
public class WCSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private String[] lines = {
"this is first line",
"this is second line",
"this is third line"
};
@Override
public void nextTuple() {
Random random =
new Random();
System.out.println(
"aa");
collector.emit(
new Values(lines[random.nextInt(lines.length)]));
}
@SuppressWarnings(
"rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(
new Fields(
"line"));
}
}
WordCountSplitBlot
public class WCSplitBlot extends BaseRichBolt{
private OutputCollector collector;
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField(
"line");
String[] words = line.split(
" ");
for(String word : words) {
collector.emit(
new Values(word,
1));
}
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(
new Fields(
"word",
"count"));
}
}
WordCountCountBlot
public class WCCountBlot extends BaseRichBolt{
private OutputCollector collector;
private Map<String, Integer> map =
new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField(
"word");
Integer count = tuple.getInteger(tuple.getIntegerByField(
"count"));
if(!map.containsKey(word)) {
map.put(word, count);
}
else{
Integer currentCount = map.get(word);
map.put(word, currentCount + count);
}
System.out.println(map);
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
MainTopology
public class WCTopology {
public static void main( String[] args ) throws Exception {
TopologyBuilder topologyBuilder =
new TopologyBuilder();
topologyBuilder.setSpout(
"mySpout",
new WCSpout(),
2);
topologyBuilder.setBolt(
"mybolt1",
new WCSplitBlot(),
2).shuffleGrouping(
"mySpout");
topologyBuilder.setBolt(
"mybolt2",
new WCCountBlot(),
4).fieldsGrouping(
"mybolt1",
new Fields(
"word"));
Config config =
new Config();
config.setNumWorkers(
2);
LocalCluster localCluster =
new LocalCluster();
localCluster.submitTopology(
"mywordcount",config,topologyBuilder.createTopology());
}
}