提供者和消费者分开了两个不同的项目 一.Producer:提供者 1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tiglle</groupId> <artifactId>Application-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- rabbitmq依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> <!-- logback日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency> </dependencies> </project>2.提供者的类:MessageSender.java
package com.rabbitmq.producer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MessageSender { private Logger logger = LoggerFactory.getLogger(MessageSender.class); //声明一个列队名字 private final static String QUEUE_NAME = "hello"; public boolean sendMessage(String message){ //new一个rabbitmq的连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置需要连接的RabbitMQ的机器的地址,这里指向本机 factory.setHost("localhost"); //连接器 Connection connection = null; //通道 Channel channel = null; try { //尝试获取一个连接 connection = factory.newConnection(); //尝试创建一个通道 channel = connection.createChannel(); /*声明一个列队: * 1.queue队列的名字 * 2.是否持久化 为true则在rabbitMQ重启后生存 * 3.是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除) * 4.自动删除,在最后一个连接断开后删除队列 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); /*发布消息,注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String * 1.指定交换器(如果没声明交换器,rabbit服务器将使用名称为空""类型为direct类型的交换器) * 2.routingKey(当使用默认Exchange时,routingKey为列队的名称) * 3.其他参数 * 4.body 消息,byte数组 * */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); logger.info("已发送:"+message); //关闭通道和链接(先关闭通道在关闭连接) channel.close(); connection.close(); } catch (IOException e) { logger.error("IO异常:"+e); return false; } catch (TimeoutException e){ logger.error("超时异常:"+e); return false; } return true; } }3.测试使用提供者发送列队消息的Main方法:RabbitmqMain.java
package com.rabbitmq.main; import com.rabbitmq.producer.MessageSender; public class RabbitmqMain { //调用发布消息 //消息发布以后会发布到指定的列队等待消费者消费,发送n个,消费者就会消费n次 //消费者一直在等待消息,每次有消息进来,就会立刻消费掉 public static void main(String[] args) { MessageSender messageSender = new MessageSender(); messageSender.sendMessage("hellow tiglle"); } }二.Consumer消费者 1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tiglle</groupId> <artifactId>Application-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- rabbitmq依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> <!-- logback日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency> </dependencies> </project>2.消费消息的类:MessageRecive.java
package com.rabbitmq.consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageRecive { private Logger logger = LoggerFactory.getLogger(MessageRecive.class); //普通接收消息 public boolean consumer(String queueName){ //连接Rabbitmq ConnectionFactory factory = new ConnectionFactory(); //rabbit所造的机器地址(域名或者ip) factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { //新建连接 connection = factory.newConnection(); //新建通道 channel = connection.createChannel(); //这里声明queue是为了取消息的时候,queue肯定会存在(没懂) //注意,queueDeclare是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个queue //声明一个列队,参数和提供者的一样 channel.queueDeclare(queueName,false,false,false,null); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel){ //重写父类方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body, "UTF-8"); logger.info("接收到:" + message); } }; //上面是声明消费者,这里用 声明的消费者 消费 列队的消息 channel.basicConsume(queueName, true,consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (IOException e) { logger.error("IO异常:"+e); return false; } catch (TimeoutException e) { logger.error("超时异常:"+e); return false; } return true; } }3.测试启动消费者监听消息的Main:RabbitmqMain.java
package com.rabbitmq.main; import com.rabbitmq.consumer.MessageRecive; public class RabbitmqMain { //执行消费者 //这个队列名字要和生产者中的名字一样,否则找不到队列 private final static String QUEUE_NAME = "hello"; //消费生产者的消息,一旦消费者启动,会一直监听生产者的消息,一旦生成者发送消息到列队,马上会消费 //消费者一直在等待消息,每次有消息进来,就会立刻消费掉 public static void main(String[] args) { MessageRecive messageRecive = new MessageRecive(); messageRecive.consumer(QUEUE_NAME); } }总结:无论先启动消费者还是提供者都可以,消费者一启动,就会开始监听,一旦提供者发送消息到列队,消费者就会处理