kafka 使用python消费consumer

xiaoxiao2021-02-28  39

参考

python kafka 使用 大数据:kafka常见问题 kafka python之操作kafka

Kafka基本了解

使用python读取consumer中的数据

安装kafka-python

pip install kafka-python

简单使用

import kafka import KafkaConsumer #消费kafka中最新的数据 并且自动提交offsets[消息的偏移量] consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=['localhost:9092']) for message in consumer: #注意: message ,value都是原始的字节数据,需要decode #例如: message.value.decode('utf-8') #完成对每条数据中的操作 print ("%s:%d:%d: key=%s value=%s" %s (message.topic, message.partition, message.offset, message.key, message.value))

其他使用

topic="****" groupid="****" brokerlist="*:9092,*:9092" #读取目前可读最早的消息 consumer = KafkaConsumer(topic, auto_offset_reset='earliest', bootstrap_servers=brokerlist) #获取topic主题的分区信息 consumer.partitions_for_topic(topic) #获取主题列表 print consumer.topics() #获取当前消费者订阅的主题 print consumer.subscription() #获取当前消费者topic、分区信息 print consumer.assignment() #获取当前消费者可消费的偏移量 print consumer.beginning_offsets(consumer.assignment()) #重置偏移量,从第5个偏移量消费 consumer.seek(TopicPartition(topic=topic, partition=0), 5) #获取当前主题的最新偏移量 print consumer.position(TopicPartition(topic=u'test', partition=0)) #消费多个主题 consumer = KafkaConsumer(bootstrap_servers=brokerlist) consumer.subscribe(topics=('test','test0')) #订阅要消费的主题 for message in consumer: 。。。。 #手动拉取消息 while True: msg = consumer.poll(timeout_ms=5) #从kafka获取消息 print msg time.sleep(1) #消息挂起与恢复 consumer.pause(TopicPartition(topic=u'test', partition=0)) print consumer.paused() #获取当前挂起的消费者 #处理操作 consumer.resume(TopicPartition(topic=u'test', partition=0)) #pause执行后,consumer不能读取,直到调用resume后恢复。
转载请注明原文地址: https://www.6miu.com/read-2627281.html

最新回复(0)