原文地址:https://www.tutorialspoint.com/apache_kafka/apache_kafka_integration_spark.htm
在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成。
Kafka是Spark流媒体的潜在消息传递和集成平台。 Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。数据处理完成后,Spark Streaming可以将结果发布到HDFS,数据库或仪表板中的另一个Kafka主题或商店中。下图描述了概念流程。
现在,让我们详细介绍Kafka-Spark API。
这是Spark功能的主要入口点。 SparkContext表示与Spark群集的连接,并且可用于在群集上创建RDD,累加器和广播变量。 签名的定义如下所示。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment) master − 要连接的群集URL(e.g. mesos://host:port, spark://host:port, local[4]).appName −作业名称,以显示在集群Web UI上batchDuration − 流数据将被分成批次的时间间隔 public StreamingContext(SparkConf conf, Duration batchDuration)通过提供新的SparkContext所需的配置来创建StreamingContext。
conf − Spark 参数batchDuration − 流数据将被分成批次的时间间隔KafkaUtils API用于将Kafka集群连接到Spark流。 该API具有如下定义的重要方法createStream签名。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)上面显示的方法用于创建从Kafka Brokers中提取消息的输入流。
ssc - StreamingContext对象。zkQuorum - Zookeeper仲裁。groupId - 此消费者的组ID。topics - 返回要消费的主题地图。storageLevel - 用于存储接收对象的存储级别。KafkaUtils API还有另一种方法createDirectStream,它用于创建一个输入流,直接从Kafka Brokers中提取消息而不使用任何接收器。 此流可以保证来自Kafka的每条消息都只包含在一次转换中。
示例应用程序在Scala中完成。 要编译应用程序,请下载并安装sbt,scala构建工具(与maven类似)。 主应用程序代码如下所示。
import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>") System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }spark-kafka集成取决于spark,spark流和spakKafka集成jar。 创建一个新的文件build.sbt并指定应用程序的详细信息及其依赖关系。 sbt将在编译和打包应用程序时下载必要的jar。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"运行以下命令来编译和打包应用程序的jar文件。 我们需要将jar文件提交到spark控制台来运行应用程序。
sbt package启动Kafka Producer CLI(在前一章中介绍),创建一个名为my-first-topic的新主题,并提供一些示例消息,如下所示。
Another spark test message运行以下命令将应用程序提交到spark控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>这个应用程序的输出示例如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message ..