sparkstreaming之kafka--Spark Streaming对接Kafka的方式一KafkaReceiver

xiaoxiao2021-02-28  56

package com.yys.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /**   * Spark Streaming对接Kafka的方式一   */ object KafkaReceiverWordCount {   def main(args: Array[String]): Unit = {   //使用args传递参数,其中<>表示必须的       if(args.length != 4) {       System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")     }     val Array(zkQuorum, group, topics, numThreads) = args     val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount")       //.setMaster("local[2]")     val ssc = new StreamingContext(sparkConf, Seconds(5))     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap     // TODO... Spark Streaming如何对接Kafka     val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)     // TODO... 自己去测试为什么要取第二个     messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()     ssc.start()     ssc.awaitTermination()   } }
转载请注明原文地址: https://www.6miu.com/read-2621234.html

最新回复(0)