JMS(java消息服务)
是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS就是一个java平台的技术规范。
ActiveMQ
是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
ActiveMQ安装
1,下载压缩包解压
下载地址:http://activemq.apache.org/
2,解压
tar -zxvf apache-activemq-5.6.0-bin.tar.gz
3,启动ActiveMQ
/liud/ActiveMQ/apache-activemq-5.6.0/bin 目录下运行
./activemq start
查看监听的端口61616
netstat -tnlp
web控制台地址http://120.79.25.197:8161/admin
admin/admin
消息传输数据类型
JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种
特性
每个消息只有一个接收者。
消息的发送者和消息的接收者没有时间的依赖性。
当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息。
代码实现
生产消息
spring配置activemq 连接工厂
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://120.79.25.197:61616" /> </bean>配置缓存连接工厂
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory" /> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> </bean>设置发送消息的目的地队列
<bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="queueMessage1" /> </bean>消息的生产者
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="cachingConnectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean>创建一个服务类,使用jmsQueueTemplate发送文本消息
package com.service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @SuppressWarnings("restriction") @Service("QueueService") public class QueueService { @Resource(name = "jmsQueueTemplate") private JmsTemplate jmsQueueTemplate; public void send(Destination destination, final Object message) { jmsQueueTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.toString()); } }); } }创建测试类
package com.test; import javax.annotation.Resource; import org.apache.activemq.command.ActiveMQQueue; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.service.QueueService; @SuppressWarnings("restriction") @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jms.xml" }) public class TestQueue { @Autowired private QueueService queueService; @Resource(name = "queue1") private ActiveMQQueue queue1; @Test public void TestQueue1() { System.out.println("begin"); queueService.send(queue1, "nihao_detail2"); System.out.println("end"); } }运行测试类TestQueue1方法,activemq控制台中可以看到队列消息
Number Of Consumers 消费者数量
Messages Enqueued 进入队列的消息数量,这个数据只增不减
Messages Dequeued 出队列的消息数量
接收消息
使用监听spring配置
<!-- 消息监听器 --> <bean id="QueueMessageListener" class="com.listener.QueueMessageListener"/> <!-- 消息监听容器 --> <bean id="QueueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="queue1" /> <property name="messageListener" ref="QueueMessageListener" /> </bean>监听器类重写onMeassage方法
package com.listener; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { try { // 如果是文本消息 if (message instanceof TextMessage) { TextMessage tm = (TextMessage) message; System.out.println("get textMessage:\t" + tm.getText()); } // 如果是Map消息 if (message instanceof MapMessage) { MapMessage mm = (MapMessage) message; System.out.println("get mapMessage:\t" + mm.getString("msgId")); } // 如果是Object消息 if (message instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) message; String exampleUser = (String) om.getObject(); System.out.println("get ObjectMessage:\t" + exampleUser.toString()); } // 如果是bytes消息 if (message instanceof BytesMessage) { byte[] b = new byte[1024]; int len = -1; BytesMessage bm = (BytesMessage) message; while ((len = bm.readBytes(b)) != -1) { System.out.println(new String(b, 0, len)); } } // 如果是Stream消息 if (message instanceof StreamMessage) { StreamMessage sm = (StreamMessage) message; System.out.println(sm.readString()); System.out.println(sm.readInt()); } } catch (JMSException e) { e.printStackTrace(); } } }启动中间件后,消息队列中增加新消息后onMessage方法就会监听到。
也可以主动接收消息,代码如下:
@Resource(name = "jmsQueueTemplate") private JmsTemplate jmsQueueTemplate; @Test public void TestConsumerMessage() throws Exception { Message message = jmsQueueTemplate.receive(queue1); if (message instanceof TextMessage) { TextMessage tm = (TextMessage) message; System.out.println("get1 textMessage:\t" + tm.getText()); } }
特点
一个消息可传递多个订阅者;
发布和订阅者有时间依赖性,只有当客户端创建订阅后才能接收消息,并且订阅者需一直保持活动状态;
代码实现
生产消息定义消息的目的地
<!-- 发送消息的目的地(一个队列) --> <bean id="Topic1" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg value="TopicMessage1" /> </bean>使用spring生产消息模板
<!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="cachingConnectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean>生产消息服务类
package com.service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; @SuppressWarnings("restriction") @Service("TopicService") public class TopicService { @Resource(name = "jmsTopicTemplate") private JmsTemplate jmsTopicTemplate; public void send(Destination destination, final Object message) { jmsTopicTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message.toString()); } }); } }测试类
package com.test; import javax.annotation.Resource; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.service.TopicService; @SuppressWarnings("restriction") @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jmsProducer.xml" }) public class TestTopic { @Autowired private TopicService topicService; @Resource(name = "Topic1") private ActiveMQTopic TopicMessage; @Test public void TestTopic1() { System.out.println("begin"); topicService.send(TopicMessage, "nihao_topicmessage2"); System.out.println("end"); } }接收消息
监听器类1
package com.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TopicMessageListener1 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("---------消息消费1---------"); System.out.println("消息内容:\t" + tm.getText()); System.out.println("消息ID:\t" + tm.getJMSMessageID()); System.out.println("消息Destination:\t" + tm.getJMSDestination()); } catch (JMSException e) { e.printStackTrace(); } } }监听器类2
package com.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TopicMessageListener2 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("---------消息消费2---------"); System.out.println("消息内容:\t" + tm.getText()); System.out.println("消息ID:\t" + tm.getJMSMessageID()); System.out.println("消息Destination:\t" + tm.getJMSDestination()); } catch (JMSException e) { e.printStackTrace(); } } }sping配置文件中添加监听器
<!-- 消息监听器 --> <bean id="TopicMessageListener1" class="com.listener.TopicMessageListener1"/> <bean id="TopicContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="Topic1" /> <property name="messageListener" ref="TopicMessageListener1" /> </bean> <bean id="TopicMessageListener2" class="com.listener.TopicMessageListener2"/> <bean id="TopicContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="destination" ref="Topic1" /> <property name="messageListener" ref="TopicMessageListener2" /> </bean>