sparkstreaming之flume--Spark Streaming整合Flume的第二种方式Pull

xiaoxiao2021-02-28  41

package com.yys.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /**   * Spark Streaming整合Flume的第二种方式   */ object FlumePullWordCount {   def main(args: Array[String]): Unit = {     if(args.length != 2) {       System.err.println("Usage: FlumePullWordCount <hostname> <port>")       System.exit(1)     }     val Array(hostname, port) = args     val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount")     val ssc = new StreamingContext(sparkConf, Seconds(5))     //TODO... 如何使用SparkStreaming整合Flume     val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)     flumeStream.map(x=> new String(x.event.getBody.array()).trim)       .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()     ssc.start()     ssc.awaitTermination()   } }
转载请注明原文地址: https://www.6miu.com/read-2620248.html

最新回复(0)