ActiveMQ简单教程

xiaoxiao2021-02-28  22

一。名词:

1.JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信【提供产生-发送-接收消息接口】。Java消息服务是一个与具体平台无关的API,绝大多数MOM(消息中间件)提供商都对JMS提供支持。

2.MQ:Message Queue消息队列

3.ActiveMQ:Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

附:消息中间件的用途和优点

【1】. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块; 【2】. 负责建立网络通信的通道,进行数据的可靠传送。 【3】. 保证数据不重发,不丢失 【4】. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

应用ActiveMQ场景:

【1】多个项目之间集成      (1) 跨平台  (2) 多语言  (3) 多项目

【2】降低系统间模块的耦合度,解耦      (1) 软件扩展性

【3】系统前后端隔离      (1) 前后端隔离,屏蔽高安全区

二。体系架构:

1.JMS提供者

   连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

2.JMS客户

   生产或消费基于消息的Java的应用程序或对象。

3.JMS生产者

   创建并发送消息的JMS客户。

4.JMS消费者

    接收消息的JMS客户。

5.JMS消息

    包括可以在JMS客户之间传递的数据的对象

6.JMS队列

    一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

7JMS主题

    一种支持发送消息给多个订阅者的机制。

三。支持的模型

Java消息服务应用程序结构支持两种模型:

1.点对点或队列模型

2.发布者/订阅者模型

四。ActiveMQ下载地址

http://activemq.apache.org/download.html  //可下载最新版本

解压apache-activemq-5.15.2,文件目录说明:

从它的目录来说,还是很简单的:

bin存放的是脚本文件conf存放的是基本配置文件data存放的是日志文件docs存放的是说明文档examples存放的是简单的实例lib存放的是activemq所需jar包 webapps用于存放项目的目录

==》启动MQ服务器:

根据操作系统不同,进入相应win64/win32位目录,双击activemq.bat启动MQ。控制台部分显示:

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用

进入ActionMQ服务监控地址:浏览器输入http://127.0.0.1:8161/admin

登录成功,页面显示:

重点关注菜单栏:

【1】Queues:队列

【2】Topics

 ==》停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可

五。应用IDEA构建Maven项目

File-》New-》Module...-》Maven-》勾选-》选择-》Next -》

GroupId:com.jd.myMaven   |    ArtifactId:activeMQ    |    version:默认   -》Finish

项目构建成功!项目结构如下所示:

pom.xml中添加

<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.8</version> </dependency> <!-- activemq 相关maven依赖 --> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency> </dependencies>

六。创建生产者类,模拟生产者发消息

Step1:java/activemq/JMSProducer.java

package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; import java.util.Map; /** * 消息生产者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 private static final int SENDNUM = 2;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageProducer messageProducer = null;//消息的生产者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建消息队列 messageProducer = session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //发送消息 private static void sendMessage(Session session, MessageProducer messageProducer) { try { //创建消息Map<key,value> MapMessage message = session.createMapMessage(); message.setString("userName", "syz"); message.setInt("age", 30); message.setDouble("salary", 1000); message.setBoolean("isGirl", true); System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap()); //发送消息 messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); } } }

Step2:启动ActiveMQ,运行生产者类,模拟生产消息

控制台显示:

Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}

打开浏览器,访问activeMQ监控画面 http://127.0.0.1:8161/admin

【1】Queues

【2】Topics

七。模拟消费者消耗数据

package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; /** * 消息消费者 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 while (true) { MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000); //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收 if (mapMessage != null) { System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap()); } else { System.out.println("没有消息:"); } } } catch (JMSException e) { e.printStackTrace(); } } }

运行代码:

控制台显示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}

打开浏览器,查看activeMQ监控画面 http://127.0.0.1:8161/admin

【1】Queues(1条消息被消费)

【2】Topics

 

示例-----企业应用最常见方式之监听器监听方式

增加监听类JMSListener

package activemq; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; /** * 消息监听 * */ public class JMSListener implements MessageListener{ public void onMessage(Message message) { try { System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap()); } catch (JMSException e) { e.printStackTrace(); } } }重新模拟一个消费者,应用监听器监听消息 package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; public class JMSConsumerByListener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue2");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 messageConsumer.setMessageListener(new JMSListener());//注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }

=》先执行生成者,生产消息!

   控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}

   ActiveMQ监控画面显示:

=》再执行消费者,消费消息!

  控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}

  ActiveMQ监控画面显示:

代码地址 :  https://github.com/shiyuezhong11/activeMQ
转载请注明原文地址: https://www.6miu.com/read-2630567.html

最新回复(0)