rabbitmq 集成 spring mvc

xiaoxiao2021-03-01  24

引入包

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.6.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency>

rabbitmq.properties

## rabbitmq 基础参数配置 ## username=guest password=guest host=192.168.74.167 port=5672 virtual_host=/

rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd "> <!-- 线程池配置 --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数,默认为1 --> <property name="corePoolSize" value="10" /> <!-- 最大线程数,默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="50" /> <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="3000" /> <!-- 线程池维护线程所允许的空闲时间,默认为60s --> <property name="keepAliveSeconds" value="300" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <!-- 创建connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${host}" username="${username}" password="${password}" port="${port}" virtual-host="${virtual_host}"/> <!-- 通过指定下面的admin信息,当前productor中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 定义rabbit template 用于数据的接收和发送 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" /> <!-- 广播 --> <rabbit:fanout-exchange name="logs" durable="false" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="logs_1"></rabbit:binding> <rabbit:binding queue="logs_2"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 列队 --> <rabbit:queue name="logs_1" durable="false" auto-delete="false" exclusive="false"> </rabbit:queue> <rabbit:queue name="logs_2" durable="false" auto-delete="false" exclusive="false"> </rabbit:queue> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="task_queue" ref="taskListenter"/> <rabbit:listener queues="logs_1" ref="fanoutListenter" response-exchange="logs"/> <rabbit:listener queues="logs_2" ref="fanoutListenter" response-exchange="logs"/> </rabbit:listener-container> </beans>

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd"> <context:component-scan base-package="com.hhly.*" /> <!-- 加载rabbitmq --> <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath*:rabbitmq.properties</value> </list> </property> </bean> <!-- 引入配置文件 --> <import resource="rabbitmq.xml"/> </beans>

生产

package com.hhly.rabbitmq.spring.produce; public interface MQProducer { /** * 发送消息到指定队列 * @param queueKey * @param object */ public void sendDataToQueue(String queueKey, String message); /** * 发送广播信息 * @author jiangwei * @Version 1.0 * @CreatDate 2017年3月27日 下午2:17:38 * @param exchange * @param object */ public void sendDataToFanout(String exchange,String message); } package com.hhly.rabbitmq.spring.produce; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQProducerImpl implements MQProducer { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String queueKey, String message) { byte [] body= message.getBytes(); MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //properties.setPriority(5); Message message2 = new Message(body,properties ); amqpTemplate.send(queueKey,message2); } @Override public void sendDataToFanout(String exchange, String message) { amqpTemplate.convertAndSend(exchange, "", message); } }

消费

package com.hhly.rabbitmq.spring.consumer; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component; @Component public class FanoutListenter implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println(new String(message.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

测试类

package com.hhly.rabbitmq; import java.util.UUID; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.hhly.rabbitmq.spring.produce.MQProducer; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class TestQueue { @Autowired MQProducer mqProducer; @Autowired private AmqpTemplate amqpTemplate; @Test public void sendFanout() { int i = 0; for(;;){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } String message = "hello,rabbmitmq!"+ i++; mqProducer.sendDataToFanout("logs", message); } } }
转载请注明原文地址: https://www.6miu.com/read-3350174.html

最新回复(0)