准备工作
rabbitMq版本:rabbitmq_server-3.6.1
spring版本:spring4.3
POM配置<!-- rabbitmq jar--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.1.RELEASE</version> </dependency>
1.spring配置spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"> <context:property-placeholder location="classpath:application.properties"/> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/> <rabbit:admin connection-factory="connectionFactory"/> <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactory"/> </bean> <!--定义rabbit template用于数据的接收和发送--> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="change_name" routing-key="routing_name" /> <!-- 监听方式接收 --> <bean id="messageReceiver" class="com.willow.rabbitmq.QueueListenter"></bean> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queue_name" ref="messageReceiver"/> </rabbit:listener-container> <!--定义queue--> <rabbit:queue id="queue_name" name="queue_name" durable="true" auto-delete="false" exclusive="false"/> <!-- 定义direct exchange,绑定 queue --> <rabbit:direct-exchange name="change_name" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue_name" key="routing_name"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> </beans>2.RabbitMq发送消息接收消息
package com.willow.rabbitmq; import net.sf.json.JSONSerializer; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring/spring-rabbitmq-consumer.xml"}) public class RabbitMQ { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQ.class); @Autowired private AmqpTemplate amqpTemplate; @Autowired private RabbitAdmin rabbitAdmin; public void sendMessage(String queueName, String message) throws Exception { declareBinding(queueName, queueName); rabbitAdmin.getRabbitTemplate().convertAndSend(queueName, queueName, message); } @Test public void sendTest() throws Exception { for (int i = 0; i < 100; i++) { sendMessage("queue_name", "abc" + i); } } public void receiveMessage(){ Message message = rabbitAdmin.getRabbitTemplate().receive("queue_name"); if (message != null) { String mes = new String(message.getBody()); LOGGER.info("##########message:{}", mes); } } /** * 方式一:动态声明exchange和queue它们的绑定关系 rabbitAdmin * @param exchangeName * @param queueName */ protected void declareBinding(String exchangeName, String queueName) { if (rabbitAdmin.getQueueProperties(queueName) == null) { Queue queue = new Queue(queueName, true, false, false, null); rabbitAdmin.declareQueue(queue); TopicExchange directExchange = new TopicExchange(exchangeName); rabbitAdmin.declareExchange(directExchange);//声明exchange Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName); //将queue绑定到exchange rabbitAdmin.declareBinding(binding); //声明绑定关系 } else { rabbitAdmin.getRabbitTemplate().setQueue(queueName); rabbitAdmin.getRabbitTemplate().setExchange(queueName); rabbitAdmin.getRabbitTemplate().setRoutingKey(queueName); } } /** * 方式二 xml配置queue绑定方式,amqpTemplate * @throws Exception */ @Test public void send() throws Exception { Map<String, Object> bodyMap = new HashMap<String, Object>(); bodyMap.put("message", "rabbitMq_test"); String jsonStr = JSONSerializer.toJSON(bodyMap).toString(); amqpTemplate.convertAndSend(jsonStr); } } 3.消息监听
package com.willow.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * Queue监听器 */ public class QueueListenter implements MessageListener { public void onMessage(Message msg) { try{ String str=new String(msg.getBody(),"UTF-8"); System.out.println("监听器:message"+str); }catch(Exception e){ e.printStackTrace(); } } }