Apache-ActiveMQ:开源消息中介
先下载ActiveMQ,在网上找的版本是5.11.1,然后在bin中启动activeMQ.(备注:启动之前需要配置好环境变量,并且确定JDK的版本。我这边用的是64位的JDK)
启动后的照片如下:
访问的端口是:http://localhost:8161/ 可以直接进入admin界面,用户名和密码都是admin,界面如下:
经常会查看queues,topics,即为消息队列和订阅者发布者。
如果想要停掉服务在控制台需要按ctrl+shift+c,ctrl+c ,Y 。我这边是这样停的,也有人说去掉中间的ctrl+c。
编写代码如下:
消息生产者:
package com.liang.test; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息的发布者(发送者) */ public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //默认连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 private static final int SIZE=11; //发送的消息数量 public static void main(String[] args) { ConnectionFactory factory;//连接工厂 Connection connection;//连接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageProducer messageProducer;//消息生产者 factory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);//实例化连接工厂 try { connection=factory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session //destination = session.createQueue("testQueue1");//point to point//创建消息队列 destination=session.createTopic("testTopic1");//topic //创建消息队列 messageProducer=session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 发送消息 */ public static void sendMessage(Session session,MessageProducer messageProducer){ for (int i = 0; i < SIZE; i++) { try { TextMessage message=session.createTextMessage("消息发布者,发布消息"+i);//创建一条文本消息 messageProducer.send(message);//通过消息生产者发出消息 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 消息接收者: import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer1 { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageConsumer consumer; connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL); try { connection=connectionFactory.createConnection(); connection.start(); session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // destination = session.createQueue("testQueue1"); //point to point destination=session.createTopic("testTopic1"); //topic consumer=session.createConsumer(destination); /* while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } */ consumer.setMessageListener(new Listener1()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者1:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 备注:我测试的时候,消息队列的接收方是一直启动的。而发送方是发送完就停掉了。订阅者和发布者也是一样。要先启动订阅者,在启动发布者。