[问题记录]解决RabbitMQ消息丢失与重复消费问题

xiaoxiao2021-02-28  79

本文仅记录排查和问题定位、解决的过程。


1. 背景

最近用户反馈提交的SQL查询一直处于长时间等待状态,经过排查观察,发现部分查询请求丢失,导致用户提交的查询未被正常接收,继而长时间无响应。

现象:集市SQL控制台提交10个简单SQL查询 -> 消息发送方:发送10条消息至消息队列 -> 消息消费方:只消费了7条消息

2. 现状

2.1. 当前SQL查询的整体流程

生产者:PHP: 将用户的SQL查询记录在DB表,标识查询任务状态(f_status)为运行中;将DB表中的任务id、提交人等信息发送到RabbitMQ;消息队列:RabbitMQ: PHP消息提交到了交换机;交换机再把消息分发给指定的消息队列;消费者:Python: 主进程监听消息队列,一旦有消息就不停拉取;拉取一条消息,就从进程池调起一个空闲进程来处理消息;随后反馈ACK给消息队列,将消息从消息队列中移除;

2.2. 消息发送方:Web端

结论:消息发送正常 排查步骤:查看log

2.3. 消息队列

结论:消息数量正常 诊断步骤: 执行机安装rabbitmq-dump-queue插件,用于dump队列的消息; 1. 执行机:停止服务; 2. 用户:提交10个SQL查询: 3. 发送方:查看Web服务端的输出日志,确定10个消息已经往消息队列写; 4. 执行机:通过rabbitmq-dump-queue查看队列的消息,确认是正常10个消息写入;

watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@xxxxx:5672" -queue ph_open_task'

5. 执行机:启动服务,消息队列中的消息全部被接收;

2.4. 消息接收方

代码逻辑:

try: pool = Pool(processes=40) def callback(ch, method, properties, body): try: doSomething... pool.apply_async(process) except Exception as e: print traceback.format_exc() logger_msg.info(traceback.format_exc()) finally: // 这里会有问题,即使消息未被处理也会反馈ACK给RabbitMQ ch.basic_ack(delivery_tag=method.delivery_tag) while True: try: connection = pika.BlockingConnection( pika.ConnectionParameters(host='xxxxxxxx')) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=queue_name, no_ack=False) channel.start_consuming() except pika.exceptions.ConnectionClosed as e: continue except Exception as e: logger_msg.info(traceback.format_exc()) finally: channel.basic_ack(delivery_tag=method.delivery_tag) pool.close() pool.join()

结论:本例中消费者主进程将持续监听MQ,一旦MQ有消息将会拉取,随后从进程池中启动子进程来处理消息,但是从进程池启动子进程的过程并不一定成功(若当前进程池没有空闲子进程),而主进程不管任何情况下都给MQ发送ACK状态码,从而MQ将未处理的消息移除掉,导致消息丢失

3. 方案

问题是在消费者环节产生,因此对消费者做改动,需要调整消费者的架构: * 原来逻辑:使用进程池技术,主进程负责监听、接收MQ的消息,子进程负责执行MQ的消息,缺点是单一的主进程无法简单处理ACK状态码,不易维护; * 现有逻辑:使用RabbitMQ自身特性(work_queue),消费者不再维护进程池,是单进程,负责监听、接收、处理MQ的消息,处理完了以后再反馈ACK状态码,进程与进程之间互不干扰,易维护,并发量大时可随时增加消费者进程;

目前方案的问题以及解决方案:

问题1:消息重复消费 描述:用户在页面停止查询时,会导致消费者进程被杀死,因此ACK状态码未反馈至MQ,从而消息一直存留在MQ中,当新的消费者启动时会重新消费; 解决方案:消费者每次执行查询前,首先在DB上查询任务的执行状态,若处于「取消/失败/成功」则表示已经由其它消费者消费过,那么直接返回ACK状态码给MQ,将消息从MQ中移除;

问题2:进程池如何维护? 描述:用户在页面停止查询时,会导致消费者进程被杀死,导致消费者数量减少; 解决方案:维护一个监控脚本,每分钟轮询消费者进程数,若少于40个进程,则新启动一个消费者,直到数量足够;

转载请注明原文地址: https://www.6miu.com/read-59826.html

最新回复(0)