「官文译」Spark Streaming2.1.1 + Kafka集成指南(Kafka代理版本0.8.2.1或更高版本)

xiaoxiao2021-02-28  94

这里我们解释如何配置Spark Streaming来接收来自Kafka的数据。有两种方法 - 使用Receivers和Kafka的高级API的旧方法,以及一种新方法(在Spark 1.3中引入),而不使用Receivers。他们有不同的编程模型,性能特征和语义保证,所以阅读更多的细节。两种方法都被认为是当前版本的Spark的稳定API。

方法1:基于接收者的方法

此方法使用Receiver接收数据。接收器使用Kafka高级消费者API实现。与所有接收器一样,通过接收器从Kafka接收的数据存储在Spark执行程序中,然后由Spark Streaming启动的作业处理数据。

然而,在默认配置下,这种方法可能会丢失数据失败(请参阅接收器的可靠性)为了确保零数据丢失,您必须在Spark Streaming(引入Spark 1.2)中另外启用写入前端日志,同步保存所有接收的Kafka数据在分布式文件系统(例如HDFS)上写入前端日志,以便所有数据可以在故障时恢复。有关写入日志的更多详细信息,请参阅流编程指南中的部署部分

接下来,我们将讨论如何在流媒体应用程序中使用此方法。

链接:对于使用SBT / Maven项目定义的Scala / Java应用程序,请将流式应用程序与以下工件链接(有关详细信息,请参阅主编程指南中的链接部分)。

groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.1.1

对于Python应用程序,在部署应用程序时,您必须添加上述库及其依赖关系。请参阅下面的部署小节。

编程:在流应用程序代码中,导入KafkaUtils并创建输入DStream,如下所示。

import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

您还可以使用变体形式指定键和值类及其对应的解码器类createStream请参阅API文档 和示例。

要记住:

Kafka中的主题分区与Spark Streaming中生成的RDD分区无关。因此,增加主题专用分区KafkaUtils.createStream()的数量只会增加使用单个接收器中消耗的主题的线程数。它不会增加Spark在处理数据时的并行性。有关更多信息,请参阅主文档。

可以使用不同的组和主题创建多个Kafka输入DStream,以便使用多个接收器并行接收数据。

如果已使用像HDFS这样的复制文件系统启用了“写入前端日志”,则接收到的数据已被复制到日志中。因此,输入流的存储级别的存储级别StorageLevel.MEMORY_AND_DISK_SER(即使用 KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。

部署:与任何Spark应用程序一样,spark-submit用于启动应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。

对于Scala和Java应用程序,如果您正在使用SBT或Maven进行项目管理,那么将spark-streaming-kafka-0-8_2.11其及其依赖关系到应用程序JAR中。确保spark-core_2.11并将spark-streaming_2.11其标记为provided与Spark安装中已存在的依赖关系。然后用于spark-submit启动应用程序(请参阅主编程指南中的部署部分)。

对于缺少SBT / Maven项目管理的Python应用程序spark-streaming-kafka-0-8_2.11及其依赖项可以直接添加到spark-submit使用中--packages(请参阅应用程序提交指南)。那是,

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 ...

另外,您也可以下载Maven构件的JAR spark-streaming-kafka-0-8-assembly从 Maven仓库,并将其添加到spark-submit--jars

方法2:直接接近(无接收机)

在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强的端到端保证。这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义每个批处理中要处理的偏移范围。当处理数据的作业被启动时,Kafka的简单消费者API用于读取Kafka定义的偏移范围(类似于从文件系统读取文件)。请注意,此功能在Spark 1.3中为Scala和Java API引入,在Spark 1.4中为Python API。

与基于接收器的方法相比,该方法具有以下优点(即方法1)。

简化并行性:不需要创建多个输入卡夫卡流并联合它们。随着directStream,Spark Streaming将创建尽可能多的RDD分区,因为有Kafka分区要消费,这将分别从Kafka读取数据并行。因此,在Kafka和RDD分区之间存在一对一映射,这更容易理解和调整。

效率:在第一种方法中实现零数据丢失需要将数据存储在写入前端日志中,进一步复制数据。这实际上是低效的,因为数据有效地被复制了两次 - 一次是由卡夫卡(Kafka),另一次是写入前端日志(Write Ahead Log)。第二种方法消除了问题,因为没有接收器,因此不需要写入前端日志。只要您有足够的卡夫卡保留,可以从卡夫卡恢复邮件。

完全一次的语义:第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。这通常是从卡夫卡消费数据的方式。虽然这种方法(结合写入日志)可以确保零数据丢失(即至少一次语义),但是在某些故障下,一些记录可能会被消耗两次的可能性很小。这是由于Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移量之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。Spark Streaming在其检查点内跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此,尽管出现故障,Spark Streaming仍然可以有效地收到每个记录。

请注意,该方法的一个缺点是它不会在Zookeeper中更新偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。但是,您可以在每个批次中访问此方法处理的偏移量,并自己更新Zookeeper(见下文)。

接下来,我们将讨论如何在流媒体应用程序中使用此方法。

链接:仅在Scala / Java应用程序中支持此方法。将SBT / Maven项目与以下工件链接(有关更多信息,请参阅主编程指南中的链接部分)。

groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.1.1

编程:在流应用程序代码中,导入KafkaUtils并创建输入DStream,如下所示。

import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])

您还可以传递messageHandler到包含当前消息的元数据的createDirectStream访问MessageAndMetadata,并将其转换为任何所需类型。请参阅API文档 和示例。

在Kafka参数中,您必须指定metadata.broker.listbootstrap.servers默认情况下,它将从每个Kafka分区的最新偏移开始消耗。如果您将配置设置auto.offset.reset为Kafka参数smallest,那么它将从最小的偏移量开始消耗。

您也可以从任何偏移开始使用其他变体KafkaUtils.createDirectStream此外,如果要访问每个批次中消耗的Kafka抵消,您可以执行以下操作。

// Hold a reference to the current offset ranges, so it can be used downstream var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.map { ... }.foreachRDD { rdd => for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } ... }

如果您希望使用基于Zookeeper的Kafka监控工具来显示流媒体应用程序的进度,您可以自己更新Zookeeper。

请注意,HasOffsetRanges的类型转换只有在第一个在directKafkaStream中调用的方法中完成时才会成功,而不是稍后在一个方法链中。您可以使用transform()而不是foreachRDD()作为第一个方法调用以访问偏移量,然后再调用Spark方法。但是,请注意,RDD分区和Kafka分区之间的一对一映射在任何洗牌或重新分区的方法(例如reduceByKey()或window())之后不会保留。

另外需要注意的是,由于此方法不使用接收器,标准接收器相关(即表单的配置spark.streaming.receiver.*)将不适用于通过此方法创建的输入DStreams(将适用于其他输入DStreams)。而是使用配置 spark.streaming.kafka.*。一个重要的是spark.streaming.kafka.maxRatePerPartition哪个是每个Kafka分区将被该直接API读取的最大速率(每秒的消息)。

部署:这与第一种方法相同。

转载请注明原文地址: https://www.6miu.com/read-64456.html

最新回复(0)