Spark Streaming 是Spark的核心API的扩展,它集成了扩展、高吞吐和容错的数据流处理功能。Streaming可以从多方获取到数据来源,例如Kafka,Flume,或者TCP Sockets等。再利用复杂的算法获取到数据推送到文件系统,数据库等持久层存储。
下面是一个用Scala语言编写Wordcount简单例子,测试Streaming的流处理基本能力。
package streaming /** * Created by root on 17-7-10. */ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { def main(args: Array[String]): Unit = { /** * Configure SparkConf */ val conf = new SparkConf().setMaster("local[2]").setAppName("Streaming") /** * Configure SparkStreaming */ val ssc = new StreamingContext(conf, Seconds(1)) /** * Configure input stream data source and listen port */ val input = ssc.socketTextStream("localhost", 8888) /** * Programming is based on DStream, DStream blog to RDD model */ val wordcount = input.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_+_) /** * print don't trigger Job execute. method class such as print, saveAsTextFile, saveAsHadoopFiles,foreachRDD */ wordcount.print() /** * Start up framework */ ssc.start() ssc.awaitTermination() } }代码未完全展示,全代码在github仓库中,地址:https://github.com/DragonTong/Streaming.git
初始化SparkConf,配置启动模式和core number(local[2])以及应用的名称,SparkConf是所有Spark应用的开端;初始化Streaming,设置Timer(seconds(1))获取数据的时间间隔;配置InputStreming,此例用的是Socket编程的方式获取数据;创建逻辑算法,利用flatmap, map, reduceByKey算子获取结果;输出结果;启动Streaming框架;在本地模式执行此Streaming程序
/usr/local/spark/bin/spark-submit –class streaming.WordCount spark.streaming.pro-1.0-SNAPSHOT.jar
nc -lk 88888
