JMS 实现之 ActiveMQ

xiaoxiao2021-02-28  13

环境
activemq-parent-5.15.3 + jdk1.8 + mvn3

1.ActiveMQ 介绍及安装

暂时省略了,网上相关的文档很多,也建议各位参照官网文档安装.好处多多,既因为写的挺详细的,也可以锻炼一下自己英文能力

2.案例分享 这个是关键,因为使用到了 ActiveMQ,所以需要开启 ActiveMQ,可以通过在管理界面更加直观的看到信息收取和读取. 废话少说,放码出来

点对点传送 这个相对比较简单一点,就是发布者向 Queue 发送信息,然后接收者读取;对于多个接收者将分别读取 Queue 中的信息

package com.example.demo.jms.queue; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 消息发送器 */ public class MyQueueSender { // 发送次数 public static final int SEND_NUM = 10; // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/ public static final String DESTINATION_NAME = "myQueue"; /** * 发送消息 */ public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "发送消息第" + (i + 1) + "条"; TextMessage text = session.createTextMessage(message); producer.send(text); } } /** * <p> * 发送消息的基本步骤: * 1) 创建连接使用的工厂类JMS ConnectionFactory * 2) 使用管理对象JMS ConnectionFactory建立连接Connection,并启动 * 3) 使用连接Connection 建立会话Session * 4) 使用会话Session和管理对象Destination创建消息生产者 MessageProducer * 5) 使用消息生产者 MessageProducer 发送消息 * * @throws Exception */ public static void run() throws Exception { Connection connection = null; Session session = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 // 参数一:是否开启事务 true开启 ,false不开启事务,如果开启记得手动提交 // 参数二:签收模式,一般使用的有自动签收和客户端自己确认签收 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个 myQueue 队列 Destination destination = session.createQueue(DESTINATION_NAME); // 创建消息制作者 MessageProducer producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // producer.setDeliveryDelay(1000L); // producer.setDisableMessageID(false); // producer.setDisableMessageTimestamp(false); // producer.setPriority(1); // producer.setTimeToLive(1000L); sendMessage(session, producer); // 提交会话 session.commit(); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { MyQueueSender.run(); } }

发送的消息通过管理界面可以查看到

package com.example.demo.jms.queue; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class MyQueueReceiver { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/ public static final String DESTINATION_NAME = "myQueue"; // 并发测试数量 public static final int THREAD_NUM = 2; /** * 消息接收者从JMS接受消息的步骤 * <p> * 1) 创建连接使用的工厂类JMS ConnectionFactory * 2) 使用管理对象JMS ConnectionFactory 建立连接 Connection 并启动 * 3) 使用连接 Connection 建立会话 Session * 4) 使用会话 Session 和管理对象 Destination 创建消息接收者 MyQueueReceiver * 5) 使用消息接收者 MyQueueReceiver 接受消息 * * @throws Exception */ public static void run() throws Exception { Connection connection = null; Session session = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个队列 Destination destination = session.createQueue(DESTINATION_NAME); // 创建一个主题 // Destination destination = session.createTopic(DESTINATION_NAME); // 创建消息制作者 MessageConsumer consumer = session.createConsumer(destination); while (true) { // 接收数据的时间(等待) 10s Message message = consumer.receive(1000 * 10); TextMessage text = (TextMessage) message; if (text != null) { // 打印每个接收者接收到的信息 System.out.println(Thread.currentThread().getName() + "接收:" + text.getText()); } else { break; } } // 提交会话 session.commit(); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { for (int i = 0; i < THREAD_NUM; i++) { new Thread(new Runnable() { @Override public void run() { try { MyQueueReceiver.run(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } }

管理页面会显示消费者数量,并且在接收到消息之后,会更新待处理消息

客户端打印接收的信息,有两个接收者,因为点对点传送中每条消息只能由一个接收者使用,所以每个接收者分别读取了队列中的信息.

发布/订阅传送 必须要订阅者在线,然后发布者再发送信息,订阅者才能接收到消息;对于多个订阅者,都将能接收到 Topic 中的信息. 注意:请先运行订阅者,再运行发布者.如果先运行发布者,则发布的信息不能被订阅者接收

package com.example.demo.jms.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 发布/订阅消息传送 -- 发布者 */ public class MyTopicPublisher { // 发送次数 public static final int SEND_NUM = 5; // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/ public static final String DESTINATION_NAME = "myTopic"; /** * 发送消息 */ public static void sendMessage(Session session, MessageProducer publisher) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "发送消息第" + (i + 1) + "条"; TextMessage text = session.createTextMessage(message); publisher.send(text); } } /** * <p> * 发送消息的基本步骤: * 1) 创建连接使用的工厂类JMS ConnectionFactory * 2) 使用管理对象JMS ConnectionFactory建立连接Connection,并启动 * 3) 使用连接Connection 建立会话Session * 4) 使用会话Session和管理对象Destination创建消息生产者 MessageSender * 5) 使用消息生产者 MessageSender 发送消息 * * @throws Exception */ public static void run() throws Exception { Connection connection = null; Session session = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 // 参数一:是否开启事务 true开启 ,false不开启事务,如果开启记得手动提交 // 参数二:签收模式,一般使用的有自动签收和客户端自己确认签收 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个 myTopic 主题 Destination destination = session.createTopic(DESTINATION_NAME); // 创建消息制作者 MessageProducer producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); // 提交会话 session.commit(); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { MyTopicPublisher.run(); } }

图片显示有两个订阅者,主题发布了5条信息

package com.example.demo.jms.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 发布/订阅消息传送 -- 订阅者 */ public class MyTopicSubscriber { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp public static final String DESTINATION_NAME = "myTopic"; // 并发测试数量 public static final int THREAD_NUM = 2; /** * 消息接收者从JMS接受消息的步骤 * <p> * 1) 创建连接使用的工厂类JMS ConnectionFactory * 2) 使用管理对象JMS ConnectionFactory 建立连接 Connection 并启动 * 3) 使用连接 Connection 建立会话 Session * 4) 使用会话 Session 和管理对象 Destination 创建消息接收者 MyTopicSubscriber * 5) 使用消息接收者 MyTopicSubscriber 接受消息,需要用 setMessageListener 将 MessageListener 接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。 * * @throws Exception */ public static void run() { Connection connection = null; Session session = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个主题 Destination destination = session.createTopic(DESTINATION_NAME); // 创建消息制作者 MessageConsumer subscriber = session.createConsumer(destination); // subscriber.setMessageListener(new MessageListener() { // public void onMessage(Message message) { // try { // TextMessage text = (TextMessage) message; // System.out.println(Thread.currentThread().getName() + " 接收:" + text.getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); // Thread.sleep(1000 * 100); while (true) { // 接收数据的时间(等待) 10s Message message = subscriber.receive(1000 * 100); TextMessage text = (TextMessage) message; if (text != null) { // 打印每个接收者接收到的信息 System.out.println(Thread.currentThread().getName() + "接收:" + text.getText()); } else { break; } } // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭释放资源 try { if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { for (int i = 0; i < THREAD_NUM; i++) { new Thread(new Runnable() { @Override public void run() { try { MyTopicSubscriber.run(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } }

有两个订阅者,所以 Messages Dequeued 一列显示被消费了发布信息*订阅者数量

客户端打印信息.发布/订阅传送则每个订阅者都能接收到信息

上面的例子都是非持久化信息,针对于持久化信息,采用长期订阅者,关于长期订阅者标识是根据创建该订阅者的连接的 ClientID 属性以及创建订阅者时指定的订阅者名构造的。后续补上.

参考文档 1.https://blog.csdn.net/jwdstef/article/details/17380471

转载请注明原文地址: https://www.6miu.com/read-1650126.html

最新回复(0)