pykafka简单应用

xiaoxiao2021-02-28  107

''' 简单的pykafka生产,消费,再生产过程; ''' from pykafka import KafkaClient import json #消息生产者函数: def atcl_produce(topic):     client = KafkaClient(hosts = 'X.X.X.X:9092')     #设定host,port;X.X.X.X:配置好的kafka的IP地址     atcl_topic = client.topics[topic.encode('utf_8')]   #设定topic     for i in range(3):         msg = {'id':i,'article':'I am an economic articles','MD5':'2345','status':'10010'}    #模拟一个准备发送到kafka的message         print('开始生产topic为' + str(atcl_topic) +'的第' + str(i) + '篇文章')              with atcl_topic.get_sync_producer() as producer:         atcl_msg = json.dumps(msg)         producer.produce(atcl_msg.encode('utf-8'))         print('文章已经生产完') #消息消费者函数: def atcl_consume(topic):     client = KafkaClient(hosts = 'X.X.X.X:9092')     #设定host,port         atcl_topic = client.topics[topic.encode('utf_8')]   #设定准备读取的topic     atcl_consumer = atcl_topic.get_balanced_consumer(consumer_group = 'undo'.encode('utf-8'),                                             auto_commit_enable = True,           #设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息                                             zookeeper_connect = 'X.X.X.X:2181'   #从zookeeper消费,zookeeper的默认端口为2181                                             )     print('文章已经消费')     return atcl_consumer if __name__ == '__main__':     atcl_produce('undo')                    #模拟生产topic为undo的文章     atcl_consumer = atcl_consume('undo')    #模拟消费topic为undo的文章     atcl_produce('done')                    #将topic为undo的文章转化为done,重新生产到topic里。     print('程序已经完成')

一些其他相关:

1.从kafka中一个topic提取消息,消息大致按照顺序被读取,但不严格按照;

2.消费后的消息还在kafka中,但是由于offset(偏移量)移动,所以继续读未读消息;

转载请注明原文地址: https://www.6miu.com/read-38604.html

最新回复(0)