ActiveMQ的入门案例以及整合Spring的简单使用

xiaoxiao2021-02-28  5

先来个ActiveMQ介绍哈:

 MQ全称为Message Queue, 消息队列MQ)是一种应用程序对应用程序的通信方法,是一个消息中间件。   

 应用场景:为了实现系统之间的通信,把系统之间的调用耦合度降低就可以使用MQ

1) activeMQ 是Apache出品,最流行的,能力强劲的开源消息总线。

2) avtiveMQ主要特点:完全支持JMS1.1和J2EE 1.4规范;支持spring,很容易内嵌到spring中;支持ajax

3) activeMQ的消息形式:

a) 点对点形式,即生产者和接收者一一对应

b) 发布/订阅形式,即一个生产者发布消息后,可以有多个接收者订阅接收。

4) JMS五种消息正文格式:

a) StreamMessage -- Java原始值的数据流

b) MapMessage--一套名称-值对

c) TextMessage--一个字符串对象(常用)

d) ObjectMessage--一个序列化的 Java对象

e) BytesMessage--一个字节的数据流

好了直接我开始ActiveMQ的入门案例!

创建一个maven工程activeMQ_helloworld,提供两个测试类进行演示.

pom文件导入的依赖

创建一个测试类来做生产者生产消息,这里我用的是队列形式(queue),一对一的消费,我创建了一个队列叫

HelloActiveMQ,并发送了十条消息.

1 public class ActiveMQProducer { 2 @Test 3 public void testProduceMsg() throws Exception{ 4 //连接工厂 5 //使用默认的用户名,密码,路径 6 //路径为 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 //获取一个连接 9 Connection connection = connectionFactory.createConnection(); 10 //创建一个会话 11 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 12 //创建队列或者话题 13 Queue queue = session.createQueue("HelloActiveMQ"); 14 //创建生产者或者消费者 15 MessageProducer producer = session.createProducer(queue); 16 //发送消息 17 for (int i = 0; i < 10; i++) { 18 producer.send(session.createTextMessage("activeMQ,你好!"+i)); 19 } 20 //提交操作 21 session.commit(); 22 } 23 }

熟悉ActiveMQ的API,根据API来发送消息,最后的commit不要忘了!!!

在创建一个消费者来对消息进行消费,消费者引用的队列名为之前创建的生产者队列名HelloActiveMQ

1 public class ActiveMQConsumer { 2 @Test 3 public void testConsumeMsg() throws Exception{ 4 // 连接工厂 5 // 使用默认用户名、密码、路径 6 // 路径 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 // 获取一个连接 9 Connection connection = connectionFactory.createConnection(); 10 //开启连接 11 connection.start(); 12 //建立会话,第一个参数是否开启事务,为true时,最后需要session.conmit()的提交 13 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 14 // 创建队列或者话题对象 15 Queue queue = session.createQueue("HelloActiveMQ"); 16 // 创建消费者 17 MessageConsumer messageConsumer = session.createConsumer(queue); 18 19 while (true) { 20 TextMessage message = (TextMessage) messageConsumer.receive(5000); 21 if (message != null) { 22 System.out.println(message.getText()); 23 } else { 24 break; 25 } 26 } 27 } 28 }

生产者和消费者都已经创建好,现在就可以开始愉快的测试了~~~

哦,还没开启呢...

安装好的ActiveMQ在本地,进入bin选择win64(我电脑64的),activemq.bat开启

开启后

进入Activemq管理页面,地址http://服务器ip:8161,用户名admin,密码admin,如图

这个消息管理页面非常好用,用的很多,后面说~

现在执行一次生产者testProduceMsg(),生产了十条消息,可以在管理页面看到(queues队列)

显然有十条消息生产了~

现在调用消费者testConsumeMsg(),去消费这十条消息!

控制台打印出十条消息,再去看看消息管理页面>

十条消息已经消费了~~~ok

然而然而业务场景中用的最多的是监听机制,对生产者的消息进行监听,生产者一生产出消息,消费者立马进行消费掉!!!

这里我再进行监听测试>>

在消费者测试类里添加第二个方法(监听消费的方法),线程得一直开着.

1 @Test 2 // 使用监听器消费 3 public void testCosumeMQ2() throws Exception { 4 // 连接工厂 5 // 使用默认用户名、密码、路径 6 // 路径 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 // 获取一个连接 9 Connection connection = connectionFactory.createConnection(); 10 // 开启连接 11 connection.start(); 12 // 建立会话 13 // 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit(); 14 Session session = connection.createSession(false, 15 Session.AUTO_ACKNOWLEDGE); 16 // 创建队列或者话题对象 17 Queue queue = session.createQueue("HelloActiveMQ"); 18 // 创建消费者 19 MessageConsumer messageConsumer = session.createConsumer(queue); 20 messageConsumer.setMessageListener(new MessageListener() { 21 // 每次接收消息,自动调用 onMessage 22 public void onMessage(Message message) { 23 TextMessage textMessage = (TextMessage)message; 24 try { 25 System.out.println(textMessage.getText()); 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 } 30 }); 31 //不能关闭线程 32 while(true){ 33 34 } 35 }

先执行这个方法使线程一直开启监听,再去执行生产者生产十条消息,可以发现>>>

消息一生产出来立马被监听到消费掉!

简单的入门案例就写到这里,Active整合Spring的简单使用下面开写~~~

注意了!!!  开始整合Spring了...

这次分别用Queue和Topic演示

创建maven工程activeMQ_spring

pom的依赖

1 <dependencies> 2 <dependency> 3 <groupId>org.springframework</groupId> 4 <artifactId>spring-context</artifactId> 5 <version>4.1.7.RELEASE</version> 6 </dependency> 7 <dependency> 8 <groupId>org.springframework</groupId> 9 <artifactId>spring-test</artifactId> 10 <version>4.1.7.RELEASE</version> 11 </dependency> 12 <dependency> 13 <groupId>junit</groupId> 14 <artifactId>junit</artifactId> 15 <version>4.12</version> 16 </dependency> 17 <dependency> 18 <groupId>org.apache.activemq</groupId> 19 <artifactId>activemq-all</artifactId> 20 <version>5.14.0</version> 21 </dependency> 22 <dependency> 23 <groupId>org.springframework</groupId> 24 <artifactId>spring-jms</artifactId> 25 <version>4.1.7.RELEASE</version> 26 </dependency> 27 </dependencies>

如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2,5.14.2

此时用到spring-jms消息服务,jms模版和jms的监听处理

在consumer包下创建两个Queue消费者(队列消费者)

QueueConsumer1:

1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class QueueConsumer1 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消费的QueueConsumer1获取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }

QueueConsumer2:

1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class QueueConsumer2 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消费的QueueConsumer2获取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }

创建两个Topic消费者(话题/广播消费者)

TopicConsumer1:

1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class TopicConsumer1 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消费的TopicConsumer1获取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }

TopicConsumer2:

1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class TopicConsumer2 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消费的TopicConsumer2获取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }

配置applicationContext-mq-consumer.xml,注释说明配置信息~~~

1 <!-- 扫描包 --> 2 <context:component-scan base-package="cn.bowen.activemq.consume" /> 3 4 <!-- ActiveMQ 连接工厂 --> 5 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 6 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> 7 <amq:connectionFactory id="amqConnectionFactory" 8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 9 10 <!-- Spring Caching连接工厂 --> 11 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 13 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 15 <!-- 同上,同理 --> 16 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 17 <!-- Session缓存数量 --> 18 <property name="sessionCacheSize" value="100" /> 19 </bean> 20 21 <!-- 消息消费者 start--> 22 23 <!-- 定义Queue监听器 --> 24 <jms:listener-container destination-type="queue" container-type="default" 25 connection-factory="connectionFactory" acknowledge="auto"> 26 <!-- destination是队列或话题名称 --> 27 <!-- 默认注册bean名称,应该是类名首字母小写 --> 28 <jms:listener destination="springQueue" ref="queueConsumer1"/> 29 <jms:listener destination="springQueue" ref="queueConsumer2"/> 30 </jms:listener-container> 31 32 <!-- 定义Topic监听器 --> 33 <jms:listener-container destination-type="topic" container-type="default" 34 connection-factory="connectionFactory" acknowledge="auto"> 35 <jms:listener destination="springTopic" ref="topicConsumer1"/> 36 <jms:listener destination="springTopic" ref="topicConsumer2"/> 37 </jms:listener-container> 38 39 <!-- 消息消费者 end -->

配置applicationContext-mq.xml,注释说明配置信息

1 <!-- 扫描包 --> 2 <context:component-scan base-package="cn.bowen.activemq.produce" /> 3 4 <!-- ActiveMQ 连接工厂 --> 5 <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 6 <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> 7 <amq:connectionFactory id="amqConnectionFactory" 8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 9 10 <!-- Spring Caching连接工厂 --> 11 <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 13 <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 15 <!-- 同上,同理 --> 16 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 17 <!-- Session缓存数量 --> 18 <property name="sessionCacheSize" value="100" /> 19 </bean> 20 21 <!-- Spring JmsTemplate 的消息生产者 start--> 22 23 <!-- 定义JmsTemplate的Queue类型 --> 24 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 25 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 26 <constructor-arg ref="connectionFactory" /> 27 <!-- 非pub/sub模型(发布/订阅),即队列模式 --> 28 <property name="pubSubDomain" value="false" /> 29 </bean> 30 31 <!-- 定义JmsTemplate的Topic类型 --> 32 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 33 <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 34 <constructor-arg ref="connectionFactory" /> 35 <!-- pub/sub模型(发布/订阅) --> 36 <property name="pubSubDomain" value="true" /> 37 </bean> 38 39 <!--Spring JmsTemplate 的消息生产者 end-->

在produce包下创建QueueProducer生产者,引用模版的JmsTemplate的Queue类型

1 package cn.bowen.activemq.produce; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 public class QueueProducer { 15 @Autowired 16 @Qualifier("jmsQueueTemplate") 17 private JmsTemplate jmsTemplate; 18 19 public void send(String queueName,final String msg){ 20 jmsTemplate.send(queueName, new MessageCreator() { 21 22 public Message createMessage(Session session) throws JMSException { 23 return session.createTextMessage(msg); 24 } 25 }); 26 } 27 }

在produce包下创建TopicProducer生产者,引用模版的JmsTemplate的Topic类型

1 package cn.bowen.activemq.produce; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 public class TopicProducer { 15 @Autowired 16 @Qualifier("jmsTopicTemplate") 17 private JmsTemplate jmsTemplate; 18 19 public void send(String topicName,final String msg){ 20 jmsTemplate.send(topicName, new MessageCreator() { 21 22 public Message createMessage(Session session) throws JMSException { 23 return session.createTextMessage(msg); 24 } 25 }); 26 } 27 }

最后生产者和消费者的Queue和Topic俩种类型都准备好了~~~

准备测试>>>

测试我使用的是spring的JUnit4来进行注解测试

在test包下创建ConsumerTest(消费者监听)

1 package cn.bowen.activemq; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.test.context.ContextConfiguration; 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 7 8 @RunWith(SpringJUnit4ClassRunner.class) 9 @ContextConfiguration(locations="classpath:applicationContext-mq-consumer.xml") 10 public class ConsumerTest { 11 12 @Test 13 public void testProduce(){ 14 //线程不能关闭 15 while(true){} 16 } 17 }

创建生产者ProducerTest生产

1 package cn.bowen.activemq; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.test.context.ContextConfiguration; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import cn.bowen.activemq.produce.QueueProducer; 10 import cn.bowen.activemq.produce.TopicProducer; 11 12 @RunWith(SpringJUnit4ClassRunner.class) 13 @ContextConfiguration(locations="classpath:applicationContext-mq.xml") 14 public class ProducerTest { 15 @Autowired 16 private QueueProducer queueProducer; 17 18 @Autowired 19 private TopicProducer topicProducer; 20 21 @Test 22 public void testProduce(){ 23 queueProducer.send("springQueue", "这是一个队列消息!"); 24 topicProducer.send("springTopic", "这是一个广播/话题消息!"); 25 } 26 }

先执行消费者进行监听>>>

在通过生产者生产第一次消息发现>>

在通过生产者生产第二次消息发现>>

在通过生产者生产第三次消息发现>>

不难发现Queue和Topic的区别???

发送消息类型为Topic时,是以广播的形式,每一个消费者都能消费到~~~

而发送消息Queue类型时,是作为一对一队列形式的消费,一条消息只能一个消费者消费~~~(两个消费者又好像是轮流消费哈)

两种类型应用的业务场景不一样!

今天Active就写到这里啦,第一次写见笑见笑~~~别喷我哦~~~

转载请注明原文地址: https://www.6miu.com/read-1400204.html

最新回复(0)