四.消息的持久化

xiaoxiao2021-02-27  247

当rabbitMq重启的时候,消息依然会丢失。

RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上)会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html)。

提供者Producer和消费者Consomer的pom.xml和上一章的一样。

一.提供者Consumer 1.发送消息的类:MessageSender.java

package com.rabbitmq.producer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MessageSender { private Logger logger = LoggerFactory.getLogger(MessageSender.class); //声明一个列队名字 private final static String QUEUE_NAME = "hello"; /** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */ public boolean sendMessageDurable(String message){ //new一个rabbitmq的连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置需要连接的RabbitMQ的机器的地址,这里指向本机 factory.setHost("localhost"); //连接器 Connection connection = null; //通道 Channel channel = null; try { //尝试获取一个连接 connection = factory.newConnection(); //尝试创建一个通道 channel = connection.createChannel(); /*声明一个列队: * 1.queue队列的名字 * 2.是否持久化,是否持久化 为true则在rabbitMQ重启后生存, * 3.自动删除,在最后一个连接断开后删除队列 * 4.其他参数 * */ //注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除 //IO异常:java.io.IOException boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置, //不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者 //消费者谁先声明后者不能重新声明 channel.queueDeclare(QUEUE_NAME,durable,false,false,null); /*发布消息,注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String * 1.交换模式 * 2.控制消息发送到哪个队列 * 3.其他参数 * 4.body 消息,byte数组 * */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); logger.info("已发送:"+message); //关闭通道和链接(先关闭通道在关闭连接) channel.close(); connection.close(); } catch (IOException e) { logger.error("IO异常:"+e); return false; } catch (TimeoutException e){ logger.error("超时异常:"+e); return false; } return true; } }

2.测试发送消息的Mian:DurableMessageMain.java

package com.rabbitmq.main; import com.rabbitmq.producer.MessageSender; public class DurableMessageMain { /** * 测试消息持久化 * @param args */ public static void main(String[] args) { MessageSender messageSender = new MessageSender(); messageSender.sendMessageDurable("hellow tiglle"); } }

二.消费者Consumer 1.接收持久化消息的类

package com.rabbitmq.consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageRecive { private Logger logger = LoggerFactory.getLogger(MessageRecive.class); /** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */ public boolean durabletMessageConsumer(String queueName){ //连接rabbitmq ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection connection = factory.newConnection(); //解决内部类只能访问final修饰的局部变量 final Channel channel = connection.createChannel(); //声明消费的queue //注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除 //IO异常:java.io.IOException boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置, //不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者 //消费者谁先声明后者不能重新声明 channel.queueDeclare(queueName,durable,false,false,null); //在消息确认之前,不在处理其他消息 //prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack channel.basicQos(1); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类) Consumer consumer = new DefaultConsumer(channel){ //重写父类方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body, "UTF-8"); logger.info("接收到:" + message); //休眠10秒,模拟10秒处理事件 try { logger.info("开始处理消息(休眠)......"); Thread.sleep(10000); System.out.println("处理完毕!"); } catch (Exception e) { // TODO: handle exception }finally { //手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者 /** * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag) * @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息 */ channel.basicAck(envelope.getDeliveryTag(), false); } } }; //上面是声明消费者,这里用 声明的消费者 消费 列队的消息 System.out.println("开始等待提供者的消息...."); //关闭自动应答,改为手动应答,很重要 boolean autoAsk = false; channel.basicConsume(queueName, autoAsk,consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (Exception e) { logger.error("出现异常:"+e); return false; } return true; } }

2.测试接收持久化消息的Mian方法:

package com.rabbitmq.main; import com.rabbitmq.consumer.MessageRecive; public class durableMessageMain { //从哪个列队取消息 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { MessageRecive messageRecive = new MessageRecive(); messageRecive.durabletMessageConsumer(QUEUE_NAME); } }

此时,重启rabbitMq后,消息依然存在

转载请注明原文地址: https://www.6miu.com/read-11187.html

最新回复(0)