Spark Streaming之Spark Streaming处理Socket数据

xiaoxiao2021-02-28  47

package com.yys.spark.project import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /**   * Spark Streaming处理Socket数据   *   * 测试: nc   */ object NetworkWordCount {   def main(args: Array[String]): Unit = {     val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")     /**       * 创建StreamingContext需要两个参数:SparkConf和batch interval       */     val ssc = new StreamingContext(sparkConf, Seconds(5))     val lines = ssc.socketTextStream("spark01", 9999)     val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)     result.print()     ssc.start()     ssc.awaitTermination()   } }
转载请注明原文地址: https://www.6miu.com/read-2619902.html

最新回复(0)