JMS + ActiveMQ 简单的demo

xiaoxiao2021-02-27  194

JMS两个主要概念:     消息中介     消息目标:         1、队列         2、主题 两种传递模式:     点对点消息传递模型:每个消息都有一个发送者和一个消费者             发送者-〉队列-〉接受者     发布-订阅者消息传递模型:一条订阅信息,可以发送给多个订阅者         发布者-〉主题-〉订阅者们 JMS优点:     不用等待     面向消息     位置独立     确保投送

Apache-ActiveMQ:开源消息中介

先下载ActiveMQ,在网上找的版本是5.11.1,然后在bin中启动activeMQ.(备注:启动之前需要配置好环境变量,并且确定JDK的版本。我这边用的是64位的JDK)

启动后的照片如下:

访问的端口是:http://localhost:8161/     可以直接进入admin界面,用户名和密码都是admin,界面如下:

经常会查看queues,topics,即为消息队列和订阅者发布者。

如果想要停掉服务在控制台需要按ctrl+shift+c,ctrl+c ,Y 。我这边是这样停的,也有人说去掉中间的ctrl+c。

编写代码如下:

消息生产者:

package com.liang.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息的发布者(发送者) */ public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //默认连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 private static final int SIZE=11; //发送的消息数量 public static void main(String[] args) { ConnectionFactory factory;//连接工厂 Connection connection;//连接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageProducer messageProducer;//消息生产者 factory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);//实例化连接工厂 try { connection=factory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session //destination = session.createQueue("testQueue1");//point to point//创建消息队列 destination=session.createTopic("testTopic1");//topic //创建消息队列 messageProducer=session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 发送消息 */ public static void sendMessage(Session session,MessageProducer messageProducer){ for (int i = 0; i < SIZE; i++) { try { TextMessage message=session.createTextMessage("消息发布者,发布消息"+i);//创建一条文本消息 messageProducer.send(message);//通过消息生产者发出消息 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 消息接收者:

import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer1 { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageConsumer consumer; connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL); try { connection=connectionFactory.createConnection(); connection.start(); session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // destination = session.createQueue("testQueue1"); //point to point destination=session.createTopic("testTopic1"); //topic consumer=session.createConsumer(destination); /* while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } */ consumer.setMessageListener(new Listener1()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者1:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 备注:

我测试的时候,消息队列的接收方是一直启动的。而发送方是发送完就停掉了。订阅者和发布者也是一样。要先启动订阅者,在启动发布者。

转载请注明原文地址: https://www.6miu.com/read-17006.html

最新回复(0)