ActiveMQ笔记

xiaoxiao2021-02-28  46

概念理解

JMS(java消息服务)

是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS就是一个java平台的技术规范。

ActiveMQ

是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

ActiveMQ安装

1,下载压缩包解压

下载地址:http://activemq.apache.org/

2,解压

tar -zxvf apache-activemq-5.6.0-bin.tar.gz

3,启动ActiveMQ

/liud/ActiveMQ/apache-activemq-5.6.0/bin 目录下运行

./activemq start

查看监听的端口61616

netstat -tnlp

web控制台地址http://120.79.25.197:8161/admin

admin/admin

消息传输数据类型

JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种

消息传输模式

1,点对点消息模式

特性

每个消息只有一个接收者。

消息的发送者和消息的接收者没有时间的依赖性。

当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息。

代码实现 

生产消息

spring配置activemq 连接工厂

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://120.79.25.197:61616" /> </bean>

配置缓存连接工厂

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory" /> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> </bean>

设置发送消息的目的地队列

<bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="queueMessage1" /> </bean>

消息的生产者

<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="cachingConnectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean>

创建一个服务类,使用jmsQueueTemplate发送文本消息

package com.service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @SuppressWarnings("restriction") @Service("QueueService") public class QueueService { @Resource(name = "jmsQueueTemplate") private JmsTemplate jmsQueueTemplate; public void send(Destination destination, final Object message) { jmsQueueTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.toString()); } }); } }

创建测试类

package com.test; import javax.annotation.Resource; import org.apache.activemq.command.ActiveMQQueue; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.service.QueueService; @SuppressWarnings("restriction") @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jms.xml" }) public class TestQueue { @Autowired private QueueService queueService; @Resource(name = "queue1") private ActiveMQQueue queue1; @Test public void TestQueue1() { System.out.println("begin"); queueService.send(queue1, "nihao_detail2"); System.out.println("end"); } }

运行测试类TestQueue1方法,activemq控制台中可以看到队列消息

Number Of Consumers   消费者数量

Messages Enqueued  进入队列的消息数量,这个数据只增不减

Messages Dequeued 出队列的消息数量

接收消息

使用监听spring配置

<!-- 消息监听器 --> <bean id="QueueMessageListener" class="com.listener.QueueMessageListener"/> <!-- 消息监听容器 --> <bean id="QueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="queue1" /> <property name="messageListener" ref="QueueMessageListener" /> </bean>

监听器类重写onMeassage方法

package com.listener; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { try { // 如果是文本消息 if (message instanceof TextMessage) { TextMessage tm = (TextMessage) message; System.out.println("get textMessage:\t" + tm.getText()); } // 如果是Map消息 if (message instanceof MapMessage) { MapMessage mm = (MapMessage) message; System.out.println("get mapMessage:\t" + mm.getString("msgId")); } // 如果是Object消息 if (message instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) message; String exampleUser = (String) om.getObject(); System.out.println("get ObjectMessage:\t" + exampleUser.toString()); } // 如果是bytes消息 if (message instanceof BytesMessage) { byte[] b = new byte[1024]; int len = -1; BytesMessage bm = (BytesMessage) message; while ((len = bm.readBytes(b)) != -1) { System.out.println(new String(b, 0, len)); } } // 如果是Stream消息 if (message instanceof StreamMessage) { StreamMessage sm = (StreamMessage) message; System.out.println(sm.readString()); System.out.println(sm.readInt()); } } catch (JMSException e) { e.printStackTrace(); } } }

启动中间件后,消息队列中增加新消息后onMessage方法就会监听到。

也可以主动接收消息,代码如下:

@Resource(name = "jmsQueueTemplate") private JmsTemplate jmsQueueTemplate; @Test public void TestConsumerMessage() throws Exception { Message message = jmsQueueTemplate.receive(queue1); if (message instanceof TextMessage) { TextMessage tm = (TextMessage) message; System.out.println("get1 textMessage:\t" + tm.getText()); } }

2,发布订阅消息模式

特点

一个消息可传递多个订阅者;

发布和订阅者有时间依赖性,只有当客户端创建订阅后才能接收消息,并且订阅者需一直保持活动状态;

代码实现

生产消息

定义消息的目的地

<!-- 发送消息的目的地(一个队列) --> <bean id="Topic1" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg value="TopicMessage1" /> </bean>

使用spring生产消息模板

<!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="cachingConnectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean>

生产消息服务类

package com.service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @SuppressWarnings("restriction") @Service("TopicService") public class TopicService { @Resource(name = "jmsTopicTemplate") private JmsTemplate jmsTopicTemplate; public void send(Destination destination, final Object message) { jmsTopicTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.toString()); } }); } }

测试类

package com.test; import javax.annotation.Resource; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.service.TopicService; @SuppressWarnings("restriction") @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jmsProducer.xml" }) public class TestTopic { @Autowired private TopicService topicService; @Resource(name = "Topic1") private ActiveMQTopic TopicMessage; @Test public void TestTopic1() { System.out.println("begin"); topicService.send(TopicMessage, "nihao_topicmessage2"); System.out.println("end"); } }

接收消息

监听器类1

package com.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TopicMessageListener1 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("---------消息消费1---------"); System.out.println("消息内容:\t" + tm.getText()); System.out.println("消息ID:\t" + tm.getJMSMessageID()); System.out.println("消息Destination:\t" + tm.getJMSDestination()); } catch (JMSException e) { e.printStackTrace(); } } }

监听器类2

package com.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TopicMessageListener2 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("---------消息消费2---------"); System.out.println("消息内容:\t" + tm.getText()); System.out.println("消息ID:\t" + tm.getJMSMessageID()); System.out.println("消息Destination:\t" + tm.getJMSDestination()); } catch (JMSException e) { e.printStackTrace(); } } }

sping配置文件中添加监听器

<!-- 消息监听器 --> <bean id="TopicMessageListener1" class="com.listener.TopicMessageListener1"/> <bean id="TopicContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="Topic1" /> <property name="messageListener" ref="TopicMessageListener1" /> </bean> <bean id="TopicMessageListener2" class="com.listener.TopicMessageListener2"/> <bean id="TopicContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="Topic1" /> <property name="messageListener" ref="TopicMessageListener2" /> </bean>

转载请注明原文地址: https://www.6miu.com/read-2625947.html

最新回复(0)