spark streaming 自定义kafka读取topic的offset(python)

xiaoxiao2021-02-28  72

使用spark streaming 处理kafka数据,有时候程序出现异常,或者需要修改程序再次运行,就可能会造成这样的情况:

kafka中的数据读取出来了,zookeeper中已经保存了读取的offset,但是数据处理出了异常,那修改程序后再次运行就不会再处理这部分数据了。原有的程序需要修改后再运行,kill掉之后再运行,这时可能kafka的offset还没有提交到zookeeper,修改程序后再次运行会有部分数据重复处理。

由于上面这些问题,所以希望可以自己人工地管理kafka中数据读取的状态。

原来是使用下面的方式创建kafka流:

kvs = KafkaUtils.createStream(ssc, "xxx.xxx.xxx.xxx:2182/kafka", group_id, {topic:1},{'auto.offset.reset':'smallest'})

这种方式是使用zookeeper来管理kafka中topic中每个partition读取的offset。

上网搜了一些资料,发现可以使用createDirectStream来自定义设置读取的offset:

from pyspark.streaming import StreamingContext from pyspark import SparkContext,SparkConf from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition spark_conf = SparkConf() sc = SparkContext(conf=spark_conf) ssc = StreamingContext(sc, 30) topic = "test" partition = 0 start = 0 topicPartion = TopicAndPartition(topic,partition) fromOffset = {topicPartion: long(start)} directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 'xxx.xxx.xxx.xxx:9092'}, fromOffsets=fromOffset)

其中,fromOffset保存topic partition对应的offset的信息,例子中test这个topic只有一个partition,设置从offset=0的位置开始读。

在数据处理完之后,希望可以保存数据读取位置的状态:

offsetRanges = [] def storeOffsetRanges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd def printOffsetRanges(rdd): print rdd.count() for o in offsetRanges: print "__________________________________" print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset) print "__________________________________" directKafkaStream\ .transform(storeOffsetRanges)\ .foreachRDD(printOffsetRanges) 例子中的storeOffsetRanges函数将数据读取的偏移信息保存在了offsetRanges中,在printOffsetRanges中打印出了rdd的topic、partition、读取数据最小的offset,最大的offset。

这样我们也可以将这些信息保存到mysql、mongodb等,在以后需要重启spark streaming任务的时候,就可以从数据库中保存的上次的offset开始读取数据。

api:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils

github:https://github.com/Stratio/spark-kafka

stackoverflow: http://stackoverflow.com/questions/33268689/how-to-create-inputdstream-with-offsets-in-pyspark-using-kafkautils-createdirec

转载请注明原文地址: https://www.6miu.com/read-82446.html

最新回复(0)