Spark-Streaming的最简单使用

xiaoxiao2021-02-28  103

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>${spark.version}</version> </dependency>

程序1:

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2017/6/6. */ object StreamingWordCount { def main(args: Array[String]) { //LoggerLevels.setStreamingLogLevels() //StreamingContext val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) //接收数据 val ds = ssc.socketTextStream("192.168.33.62", 9999) //DStream是一个特殊的RDD //hello tom hello jerry val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) //打印结果 result.print() ssc.start() ssc.awaitTermination() } } 再另一个shell中: 使用nc命令,不断向端口发送数据

yum -y install nc

nc -lk 9999

转载请注明原文地址: https://www.6miu.com/read-19729.html

最新回复(0)