一。名词:
1.JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信【提供产生-发送-接收消息接口】。Java消息服务是一个与具体平台无关的API,绝大多数MOM(消息中间件)提供商都对JMS提供支持。
2.MQ:Message Queue消息队列
3.ActiveMQ:Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现
附:消息中间件的用途和优点
【1】. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 【2】. 负责建立网络通信的通道,进行数据的可靠传送。 【3】. 保证数据不重发,不丢失 【4】. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务
应用ActiveMQ场景:
【1】多个项目之间集成 (1) 跨平台 (2) 多语言 (3) 多项目
【2】降低系统间模块的耦合度,解耦 (1) 软件扩展性
【3】系统前后端隔离 (1) 前后端隔离,屏蔽高安全区
二。体系架构:
1.JMS提供者
连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
2.JMS客户
生产或消费基于消息的Java的应用程序或对象。
3.JMS生产者
创建并发送消息的JMS客户。
4.JMS消费者
接收消息的JMS客户。
5.JMS消息
包括可以在JMS客户之间传递的数据的对象
6.JMS队列
一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
7JMS主题
一种支持发送消息给多个订阅者的机制。
三。支持的模型
Java消息服务应用程序结构支持两种模型:
1.点对点或队列模型
2.发布者/订阅者模型
四。ActiveMQ下载地址
http://activemq.apache.org/download.html //可下载最新版本
解压apache-activemq-5.15.2,文件目录说明:
从它的目录来说,还是很简单的:
bin存放的是脚本文件conf存放的是基本配置文件data存放的是日志文件docs存放的是说明文档examples存放的是简单的实例lib存放的是activemq所需jar包 webapps用于存放项目的目录==》启动MQ服务器:
根据操作系统不同,进入相应win64/win32位目录,双击activemq.bat启动MQ。控制台部分显示:
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用
进入ActionMQ服务监控地址:浏览器输入http://127.0.0.1:8161/admin
登录成功,页面显示:
重点关注菜单栏:
【1】Queues:队列【2】Topics
==》停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可
五。应用IDEA构建Maven项目
File-》New-》Module...-》Maven-》勾选-》选择-》Next -》
GroupId:com.jd.myMaven | ArtifactId:activeMQ | version:默认 -》Finish
项目构建成功!项目结构如下所示:
pom.xml中添加
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.8</version> </dependency> <!-- activemq 相关maven依赖 --> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency> </dependencies>六。创建生产者类,模拟生产者发消息
Step1:java/activemq/JMSProducer.java
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; import java.util.Map; /** * 消息生产者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 private static final int SENDNUM = 2;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageProducer messageProducer = null;//消息的生产者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建消息队列 messageProducer = session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //发送消息 private static void sendMessage(Session session, MessageProducer messageProducer) { try { //创建消息Map<key,value> MapMessage message = session.createMapMessage(); message.setString("userName", "syz"); message.setInt("age", 30); message.setDouble("salary", 1000); message.setBoolean("isGirl", true); System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap()); //发送消息 messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); } } }Step2:启动ActiveMQ,运行生产者类,模拟生产消息
控制台显示:
Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}
打开浏览器,访问activeMQ监控画面 http://127.0.0.1:8161/admin
【1】Queues
【2】Topics
七。模拟消费者消耗数据
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; /** * 消息消费者 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 while (true) { MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000); //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收 if (mapMessage != null) { System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap()); } else { System.out.println("没有消息:"); } } } catch (JMSException e) { e.printStackTrace(); } } }运行代码:
控制台显示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}
打开浏览器,查看activeMQ监控画面 http://127.0.0.1:8161/admin
【1】Queues(1条消息被消费)
【2】Topics
示例-----企业应用最常见方式之监听器监听方式
增加监听类JMSListener
package activemq; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; /** * 消息监听 * */ public class JMSListener implements MessageListener{ public void onMessage(Message message) { try { System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap()); } catch (JMSException e) { e.printStackTrace(); } } }重新模拟一个消费者,应用监听器监听消息 package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; public class JMSConsumerByListener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue2");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 messageConsumer.setMessageListener(new JMSListener());//注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }=》先执行生成者,生产消息!
控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}
ActiveMQ监控画面显示:
=》再执行消费者,消费消息!
控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}
ActiveMQ监控画面显示:
代码地址 : https://github.com/shiyuezhong11/activeMQ