sparkstreaming之kafka--Spark Streaming对接Kafka的方式二KafkaDirect

xiaoxiao2021-02-28  46

package com.yys.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import kafka.serializer.StringDecoder /**   * Spark Streaming对接Kafka的方式二,工作中90%使用这种方式   */ object KafkaDirectWordCount {   def main(args: Array[String]): Unit = {     if(args.length != 2) {       System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")       System.exit(1)     }     val Array(brokers, topics) = args     val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount")       //.setMaster("local[2]")     val ssc = new StreamingContext(sparkConf, Seconds(5))     val topicsSet = topics.split(",").toSet     val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)     // TODO... Spark Streaming如何对接Kafka     val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](     ssc,kafkaParams,topicsSet     )     // TODO... 自己去测试为什么要取第二个     messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()     ssc.start()     ssc.awaitTermination()   } }
转载请注明原文地址: https://www.6miu.com/read-2620273.html

最新回复(0)