Spark-stream 主动poll数据

xiaoxiao2021-02-28  146

import java.net.InetSocketAddress import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object FlumePollWordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //从flume中拉取数据(flume的地址) val address = Seq(new InetSocketAddress("172.16.0.11", 8888)) val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK) val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1)) val results = words.reduceByKey(_+_) results.print() ssc.start() ssc.awaitTermination() } }
转载请注明原文地址: https://www.6miu.com/read-25988.html

最新回复(0)