「官文译」Spark 结构Streaming-2.1.1 + Kafka 集成指南 (Kafka broker version 0.10.0 or higher)

xiaoxiao2021-02-28  89

注:

Spark Streaming + Kafka集成指南

Apache Kafka是作为分布式,分区,复制的提交日志服务的发布订阅消息。在开始使用Spark集成之前,请仔细阅读Kafka文档。

Kafka项目在0.8和0.10之间引入了新的消费者api,所以有两个单独的相应的Spark Streaming包可用。请为您的经纪人选择正确的包装和所需功能; 请注意,0.8集成与以后的0.9和0.10经纪人兼容,但0.10集成与早期经纪人不兼容。

  火花流 - 卡夫卡0-8 火花流 - 卡夫卡0-10 经纪人版 0.8.2.1以上 0.10.0以上 Api稳定性 稳定 试验 语言支持 Scala,Java,Python Scala,Java 接收器DStream 是 没有 直接DStream 是 是 SSL / TLS支持 没有 是 偏移提交Api 没有 是 动态主题订阅 没有 是 ===========================================================================(只能最新的结构流才能用这种方式在kafka-0.9以上版本)

创建Kafka源(Batch批次)

源中的每一行都具有以下模式:

Each row in the source has the following schema:

Column Type key binary value binary topic string partition int offset long timestamp long timestampType int

必须为(Batch处理)和(streaming queries.流式查询)的Kafka源设置以下选项。

Option value meaning assign (分配) json string {"topicA":[0,1],"topicB":[2,4]} (json字符窜) Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (具体主题分配消费。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) subscribe (订阅) A comma-separated list of topics (以逗号分隔的主题列表) The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (要订阅的主题列表。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) subscribePattern (如上2选1 Java regex string (Java正则表达式字符串) The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (用于订阅主题的模式。只能为Kafka源指定“assign”,“subscribe”或“subscribePattern”选项之一。) kafka.bootstrap.servers A comma-separated list of host:port (主机:端口的逗号分隔列表) The Kafka "bootstrap.servers" configuration.bootstrap(引导程序).servers" configuration. (Kafka“bootstrap.servers”配置。) "bootstrap.servers" -> "localhost:9092,anotherhost:9092" 如配置: val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", //,anotherhost:9092 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) )

以下配置是可选的:

选项 值 默认 查询类型 含义 startingOffsets "earliest", "latest" (streaming only),  “最早”,“最新”(仅限流媒体) 或json string“”“{”topicA“:{”0“:23,”1“: - 1},”topicB“:{”0“: - 2} }“”“ “最新”流式传输,“最早”批量 流和批 查询开始时的起点,从最早的偏移“最早”开始,最新的“最新”,即最新偏移量, 或指定每个TopicPartition的起始偏移量的json字符串。 在json中,-2作为偏移可以用来指最早的,-1表示最新的。 注意:对于批处理查询,不允许使用最新的(隐式的或通过使用-1)。 对于流式查询,这仅在新查询启动时适用,并且恢复将始终从查询离开的地方获取。查询期间新发现的分区最早开始。 endingOffsets 最新或者json string {“topicA”:{“0”:23,“1”: - 1},“topicB”:{“0”: - 1}} latest 批次查询 批处理查询结束时的终点,即最新的最新参数,或者指定每个TopicPartition的结束偏移量的json字符串。在json中,-1作为偏移量可以用来指代最新的,而-2(最早的)作为偏移是不允许的。 failOnDataLoss true or false true 流式查询 是否可能丢失数据时失败查询(例如,主题被删除或偏移超出范围)。这可能是一个虚惊。当您无法正常工作时,您可以禁用它。如果由于丢失的数据而无法从提供的偏移中读取任何数据,批量查询将始终失败。 kafkaConsumer.pollTimeoutMs long 512 流和批 以毫秒为单位的执行者轮询Kafka数据的超时时间。 fetchOffset.numRetries INT 3 流和批 在放弃获取Kafka抵消之前重试的次数。 fetchOffset.retryIntervalMs long 10 流和批 毫秒级别,然后重试以获取Kafka抵消 maxOffsetsPerTrigger long none 流和批 每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将按不​​同卷的topicPartitions按比例分割。

卡夫卡自己的配置可以通过设置DataStreamReader.option与kafka.前缀,例如 stream.option("kafka.bootstrap.servers", "host:port")。有关可能的kafkaParams,请参阅 Kafka消费者配置文档。

请注意,以下Kafka参数无法设置,Kafka源将抛出异常:

group.id:Kafka源将自动为每个查询创建唯一的组ID。 "group.id" -> "use_a_separate_group_id_for_each_stream", auto.offset.reset:设置source选项startingOffsets以指定从哪里开始。结构化流式管理哪些偏移量在内部消耗,而不是依靠kafka消费者来做。当新的主题/分区被动态订阅时,这将确保不会丢失任何数据。请注意,startingOffsets只有在启动新的流式查询时才适用,并且恢复将始终从查询离开的地方获取。 "auto.offset.reset" -> "latest", key.deserializer:使用ByteArrayDeserializer的键始终反序列化为字节数组。使用DataFrame操作显式反序列化键。 "key.deserializer" -> classOf[StringDeserializer], value.deserializer:值始终使用ByteArrayDeserializer反序列化为字节数组。使用DataFrame操作来显式反序列化值。 "value.deserializer" -> classOf[StringDeserializer], enable.auto.commit:Kafka源不提交任何偏移量。 "enable.auto.commit" -> (false: java.lang.Boolean) interceptor.classes:Kafka源总是读取键和值作为字节数组。使用ConsumerInterceptor是不安全的,因为它可能会中断查询。

部署

与任何Spark应用程序一样,spark-submit用于启动应用程序。spark-sql-kafka-0-10_2.11 并且其依赖性可以直接添加到spark-submit使用中--packages,例如,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.1 ...

有关提交具有外部依赖关系的应用程序的更多详细信息,请参阅申请提交指南。

转载请注明原文地址: https://www.6miu.com/read-65942.html

最新回复(0)