程序1:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2017/6/6. */ object StreamingWordCount { def main(args: Array[String]) { //LoggerLevels.setStreamingLogLevels() //StreamingContext val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) //接收数据 val ds = ssc.socketTextStream("192.168.33.62", 9999) //DStream是一个特殊的RDD //hello tom hello jerry val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) //打印结果 result.print() ssc.start() ssc.awaitTermination() } } 再另一个shell中: 使用nc命令,不断向端口发送数据yum -y install nc
nc -lk 9999