三.防止消息丢失

xiaoxiao2021-02-27  235

提供者和消费者的pom.xml和上一章一样

RabbitMQ中,消息丢失可以简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ都给出了相应的解决方案。

默认情况下,RabbitMQ会平均的分发消费给多个消费者,假设一个消费者任务的执行时间非常长,在执行过程中,客户端挂了(连接断开),那么,该客户端正在处理且未完成的消息,以及分配给它还没来得及执行的消息,都将丢失。因为默认情况下,RabbitMQ分发完消息后,就会从内存中把消息删除掉。

解决方法:消息确认: RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息,来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发出的确认信息,则会把消息转发给其他保持在线的消费者。

上代码:

一.Producer提供者: 1.发送消息的类 2.测试启动发送小的Main 和上一章的提供者一样

二.Consumer消费者 1.消费者的类:MessageRecive.java 分两个部分,重现消息丢失的情况和解决方案

package com.rabbitmq.consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageRecive { private Logger logger = LoggerFactory.getLogger(MessageRecive.class); //测试客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭 public boolean lostMessageConsumer(String queueName){ //连接rabbitmq ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel =null; //声明消费的queue try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(queueName,false,false,false,null); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel){ //重写父类方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body, "UTF-8"); logger.info("接收到:" + message); //休眠10秒,模拟10秒处理事件 try { logger.info("开始处理消息(休眠)......"); Thread.sleep(10000); System.out.println("处理完毕!"); } catch (Exception e) { // TODO: handle exception } } }; //上面是声明消费者,这里用 声明的消费者 消费 列队的消息 System.out.println("开始等待提供者的消息...."); //自动应答,消费者自动应答给提供者(手动应答为false,需要开发者手动应答) boolean autoAck = true; channel.basicConsume(queueName, autoAck,consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (Exception e) { logger.error("出现异常:"+e); return false; } return true; } //解决客户端丢失消息的情况,设置10秒处理事件,处理的时候强制关闭(原理:消息确认机制) /** * RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息, * 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发 * 出的确认信息,则会把消息转发给其他保持在线的消费者。 * @param queueName * @return */ public boolean resolveLostMessageConsumer(String queueName){ //连接rabbitmq ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection connection = factory.newConnection(); //解决内部类只能访问final修饰的局部变量 final Channel channel = connection.createChannel(); //声明消费的queue channel.queueDeclare(queueName,false,false,false,null); //在消息确认之前,不在处理其他消息 //prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack channel.basicQos(1); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类) Consumer consumer = new DefaultConsumer(channel){ //重写父类方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body, "UTF-8"); logger.info("接收到:" + message); //休眠10秒,模拟10秒处理事件 try { logger.info("开始处理消息(休眠)......"); Thread.sleep(10000); System.out.println("处理完毕!"); } catch (Exception e) { // TODO: handle exception }finally { //手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者 /** * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag) * @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息 */ channel.basicAck(envelope.getDeliveryTag(), false); } } }; //上面是声明消费者,这里用 声明的消费者 消费 列队的消息 System.out.println("开始等待提供者的消息...."); //关闭自动应答(接收到消息提供者就删除消息),改为手动应答,很重要 boolean autoAsk = false; channel.basicConsume(queueName, autoAsk,consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (Exception e) { logger.error("出现异常:"+e); return false; } return true; } }

2.测试消息丢失的情况的Main:LostMessageMain.java

package com.rabbitmq.main; import com.rabbitmq.consumer.MessageRecive; public class LostMessageMain { //从哪个列队取消息 private final static String QUEUE_NAME = "hello"; //测试客户端丢失消息的情况,当程序正在执行时,出现异常(强制中断程序),正在处理的和等待处理的消息都会丢失 public static void main(String[] args) { MessageRecive messageRecive = new MessageRecive(); messageRecive.lostMessageConsumer(QUEUE_NAME); } }

当程序执行到一般时强制关闭程序,然后在到RabbitMq的消息列队中查看消息(http://localhost:15672/),发现列队要发给此消费者的消息s已经被删除。

3.测试解决消息丢失(消息确认)的Main:ResolveLostMessageMain.java

package com.rabbitmq.main; import com.rabbitmq.consumer.MessageRecive; public class ResolveLostMessageMain { //指定需要消费哪个列队的消息 private static final String QUEUE_NAME = "hello"; //解决客户端消息丢失的方法(消息确认机制),生产两条消息,执行时强制关闭程序,发现消息还在 //如果有其他消费者,会发送给其他消费者 /** * RabbitMQ引入了消息确认机制,当消息处理完成后,给Server端发送一个确认消息, * 来告诉服务端可以删除该消息了,如果连接断开的时候,Server端没有收到消费者发 * 出的确认信息,则会把消息转发给其他保持在线的消费者。 * 自动确认:接收到消息后就向提供者发送确认消息,提供者就删除消息 * 手动确认:开发者认为消息处理完毕手动向提供者发送确认消息,提供者才从列队删除消息 * */ public static void main(String[] args) { MessageRecive messageRecive = new MessageRecive(); messageRecive.resolveLostMessageConsumer(QUEUE_NAME); } }

当消息执行到一半,强制关闭程序,再去查看列队消息,发现此消费者的消息s都还在,如果有多个消费者 ,其中一个消费者挂掉了,mq不会删除列队的消息,会发送给其他消费者消费。

转载请注明原文地址: https://www.6miu.com/read-9500.html

最新回复(0)