Maven+Apache+Spring使用ActiveMQ避坑步骤

xiaoxiao2021-02-28  36

1.导入jar包,spring5使用的jms是1.1,需要手动导入jms2.0的jar包

<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api --> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <!-- MQ --> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.0.6.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> <!-- end MQ -->

2.新建消费者(监听器)

队列的
package com.project.listener; import java.io.IOException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.servlet.http.HttpSession; import org.springframework.beans.factory.annotation.Autowired; import com.fasterxml.jackson.databind.ObjectMapper; import com.project.vo.ReplyQueueDto; public class QueueMessageListener implements MessageListener{ @Autowired private HttpSession session; @Override public void onMessage(Message message) { System.out.println("session注入成功"+session); TextMessage tm = (TextMessage) message; try { System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText()); ReplyQueueDto rqd = new ObjectMapper().readValue(tm.getText(), ReplyQueueDto.class); System.out.println("[查看是否转换成功了]"+rqd); //do something ... } catch (JMSException | IOException e) { e.printStackTrace(); } } }
Topic的
package com.project.listener; import java.io.IOException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.beans.factory.annotation.Autowired; import com.fasterxml.jackson.databind.ObjectMapper; import com.project.service.ITaskService; import com.project.vo.TaskTopicDto; public class TopicMessageListener implements MessageListener{ @Autowired private ITaskService taskService; @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("TopicMessageListener监听到了文本消息:\t" + tm.getText()); TaskTopicDto ttd = new ObjectMapper().readValue(tm.getText(),TaskTopicDto.class); taskService.addTaskPending(ttd); System.out.println("添加待处理事件成功!"); //do something ... } catch (JMSException | IOException e) { e.printStackTrace(); } } }

3.spring-mq.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.15.3.xsd"> <!-- ActiveMQ 连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.1.67:61616" userName="u001" password="123" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JmsTemplate 的消息生产者 start --> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型,本次项目没有使用到 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean> <!--Spring JmsTemplate 的消息生产者 end --> <!--队列的名字是reply_queue --> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="reply_queue" /> </bean> <!--Topic --> <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="task_topic" /> </bean> <!-- 消息监听容器-测试用 --> <!-- <bean id="queueMessageListener" class="com.project.listener.QueueMessageListener" /> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destinationQueue" /> <property name="messageListener" ref="queueMessageListener" /> </bean> --> <!-- 消息监听容器 --> <bean id="topicMessageListener" class="com.project.listener.TopicMessageListener" /> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destinationTopic" /> <property name="messageListener" ref="topicMessageListener" /> </bean> </beans>

4.配置web.xml的listener

约束

http://activemq.apache.org/xml-reference.html

转载请注明原文地址: https://www.6miu.com/read-2629024.html

最新回复(0)