RabbitMQ消息可靠送达

xiaoxiao2021-02-28  99

目前项目中采用RabbitMQ,在运行的过程当中,发现有一部分消息存在丢失的情况,结合自己对RabbitMQ的理解,一般分为两种情况,一种是客户端丢失,即消息没有成功送达到RabbitMQ Server,一种是消费端并有成功消费消息,针对这两种情况,我们分别做了相应的方案。

一、消息没有送达到RabbitMQ Server,导致丢失

由于看不了RabbitMQ Server的消息日志,我们自己做了一个消息日志表,操作步骤如下:

1、在业务操作成功之后,把消息持久化到DB中,这两步操作放在同一个事务当中,要么都成功,要么都失败。

2、事务提交成功之后,单独启动一个线程进行消息的发送

3、启用RabbitMQ的消息确认(message acknowledgement)机制,我们使用的是Spring,首先就要修改Spring-rabbitmq.xml中的配置。配置如下:

<rabbit:connection-factory id="rabbitmqConnectionFactory" virtual-host="${rabbitmq.vhost}" channel-cache-size="25" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" publisher-confirms="true"/> <rabbit:admin connection-factory="rabbitmqConnectionFactory"/> <rabbit:template id="amqpTemplate" exchange="${exchangeName}" connection-factory="rabbitmqConnectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/> 关键参数已用红色字体标识。注意:mandatory必须设置为true,否则回调不会生效。

confirmCallBackListener类的代码如下:

@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{ private static Logger logger = LoggerFactory.getLogger(ConfirmCallBackListener.class); @Autowired private IMessageLogService messageLogService; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); try { if (!ack) { logger.info("send message failed: " + cause + correlationData.toString()); } else {//发送成功,更新消息日志表数据 logger.info("send message successful !"); MessageLog vo = new MessageLog(); vo.setToken(correlationData.getId()); vo.setSendStatus(ConstantUtils.SENT); messageLogService.saveOrUpdateMessageLog(vo); logger.info("update message send status ==="+ JSON.toJSONString(vo)); } }catch (Exception e){ logger.error("message confirm exception :"+e); } } } returnCallBackListener类的代码如下:

@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{ private static Logger logger = LoggerFactory.getLogger(ReturnCallBackListener.class); @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { try { System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey); //做相应的业务操作 }catch (Exception e){ logger.error("message return callback exception:"+e); } } } 4、如果出现RabbitMQ Server连接不上,我们做了一个定时任务,扫描message_log表,如果存在消息没有确认,则会向RabbitMQ Server一直重发,直到发送成功。

二、消费端消费消息失败或有其它异常

1、消费者在消费消息成功时给到生产者回执,这时发送端收到消费者回执之后,更改message-log表的发送状态。则整个过程结束。

2、采用RabbitMQ本身的机制进行消息回执。配置如下:

<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" > <rabbit:listener queues="your_queue_name" ref="receiveConfirmListener" /> </rabbit:listener-container>

receiveCallBackListener类的代码如下:

@Service("receiveConfirmListener") public class ReceiveConfirmListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try { System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); //TODO 业务处理 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }这样基本就保证了消息的可靠送达

如果有别的同学有更好的方案,欢迎吐槽。

注:MessageLog对象是一张消息日志表,表结构大致如下:

转载请注明原文地址: https://www.6miu.com/read-52868.html

最新回复(0)