提供者的pom.xml和消费者的pom.xml和第一章一样 一.Producer提供者 发送消息的类:MessageSender.java
package com.rabbitmq.producer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MessageSender { private Logger logger = LoggerFactory.getLogger(MessageSender.class); //声明一个列队名字 private final static String QUEUE_NAME = "hello"; /** * 多个消费者同时消费一个列队测试 * 发送者不停的往MQ中发送消息 * @param message * @return */ public boolean sendMessageMany(String message){ //new一个rabbitmq的连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置需要连接的RabbitMQ的机器的地址,这里指向本机 factory.setHost("localhost"); //连接器 Connection connection = null; //通道 Channel channel = null; try { //尝试获取一个连接 connection = factory.newConnection(); //尝试创建一个通道 channel = connection.createChannel(); /*声明一个列队: * 1.queue队列的名字,是否持久化 为true则在rabbitMQ重启后生存, * 2.是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除) * 3.自动删除,在最后一个连接断开后删除队列 * 4.其他参数 * */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); /*发布消息,注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String * 1.交换模式 * 2.控制消息发送到哪个队列 * 3.其他参数 * 4.body 消息,byte数组 * */ //循环往列队发布消息 int i = 0; String oldMessage = message; while(true){ message+=i++; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); logger.info("已发送:"+message); try { Thread.sleep(500);//500毫秒发送一次 } catch (InterruptedException e) { logger.info("线程被打断:"+message); } message=oldMessage;//还原消息 //此代码没用,只是为了死循环出现防止编译出错 if(!"q".equals("q")){ break; } } //关闭通道和链接(先关闭通道在关闭连接) channel.close(); connection.close(); } catch (IOException e) { logger.error("IO异常:"+e); return false; } catch (TimeoutException e){ logger.error("超时异常:"+e); return false; } return true; } }2.测试发送消息的Main:ManySenderMain.java
package com.rabbitmq.main; import com.rabbitmq.producer.MessageSender; public class ManySenderMain { public static void main(String[] args) { MessageSender messageSender = new MessageSender(); messageSender.sendMessageMany("hellow tiglle"); } }二.Consumer消费者:
1.接收消息的消费者类:
package com.rabbitmq.consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageRecive { private Logger logger = LoggerFactory.getLogger(MessageRecive.class); //多个消费者消费消息:增加了是哪个消费者的标记 public boolean manyConsume(String queueName,final String flag){ //连接rabbitmq ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel =null; //声明消费的queue try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(queueName,false,false,false,null); //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String Consumer consumer = new DefaultConsumer(channel){ //重写父类方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body, "UTF-8"); logger.info("["+flag+"]接收到:" + message); } }; //上面是声明消费者,这里用 声明的消费者 消费 列队的消息 channel.basicConsume(queueName, true,consumer); //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费 } catch (Exception e) { logger.error("出现异常:"+e); return false; } return true; } }3.测试启动多个消费者开始监听消费消息的Main方法:ManyReviceMain.java
package com.rabbitmq.main; import com.rabbitmq.consumer.MessageRecive; //多个消费之同时消费列队的消息 public class ManyReviceMain { //这个队列名字要和生产者中的名字一样,否则找不到队列 private final static String QUEUE_NAME = "hello"; /** * 测试多个客户端消费提供者的消息 * 可以看出消息会平均发送个每个消费者 * RabbitMQ的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者 * @param args */ public static void main(String[] args) { MessageRecive messageRecive1 = new MessageRecive(); messageRecive1.manyConsume(QUEUE_NAME, "consumer1"); MessageRecive messageRecive2 = new MessageRecive(); messageRecive2.manyConsume(QUEUE_NAME, "consumer2"); MessageRecive messageRecive3 = new MessageRecive(); messageRecive3.manyConsume(QUEUE_NAME, "consumer3"); MessageRecive messageRecive4 = new MessageRecive(); messageRecive4.manyConsume(QUEUE_NAME, "consumer4"); } }可以看到,提供者不停的发送消息,四个消费者是平均规律的分摊了消费者的消息。 RabbitMQ的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者