activemq的p2p模型比较简答,它好比是两个人打电话,这两个人是独享这条链路,一方发送消息,另外一方接收消息。在实际场景中会有多个用户使用p2p的队列,场景:
队列中一个生产者发送的某个消息,只能被一个消费者所消费掉。
发布订阅模式,相当于是我们订阅了某个博客栏目,每当由新的博客动态就会通知到我们每一个订阅了这个博客的人,这种模式下生产者发送的消息会被订阅了这个消息主题的所有消费者所消费。
我们来测试一下pub/sub模式:
package jeff.mq.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Publish { //1连接工厂 private ConnectionFactory connectionFactory; //2连接对象 private Connection connection; //3Session对象 private Session session; //4生产者 private MessageProducer messageProducer; public Publish(){ try { this.connectionFactory = new ActiveMQConnectionFactory( "jeff", "123456", "tcp://localhost:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.messageProducer = session.createProducer(null); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage() throws Exception{ Destination destination = session.createTopic("topic1"); TextMessage textMessage = session.createTextMessage("我是主题内容"); messageProducer.send(destination,textMessage); } public static void main(String[] args) throws Exception { Publish p = new Publish(); p.sendMessage(); } }我们创建了一个主题topic1,运行三次publish,则向主题中写入三个消息。
看下订阅者代码:
package jeff.mq.pb; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Subscriber { //1连接工厂 private ConnectionFactory connectionFactory; //2连接对象 private Connection connection; //3Session对象 private Session session; //4生产者 private MessageConsumer messageConsumer; //5目标 private Destination destination; public Subscriber(){ try { this.connectionFactory = new ActiveMQConnectionFactory( "jeff", "123456", "tcp://localhost:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createTopic("topic1"); //创建消费者的时候指定了选择器 this.messageConsumer = session.createConsumer(this.destination); this.messageConsumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); } } class Listener implements MessageListener{ @Override public void onMessage(Message message) { try { if(message instanceof TextMessage){ TextMessage ret = (TextMessage)message; System.out.println(ret.toString()); } if(message instanceof MapMessage){ MapMessage ret = (MapMessage)message; System.out.println(ret.toString()); } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Subscriber subscriber = new Subscriber(); } }启动订阅者,发现3个消息并没有被消费掉!
原因是,订阅者必须在发布者发布主题消息之前订阅这个主题才可以。于是我们先启动订阅者订阅主题:再启动发布者发布主题消息!
我们重新试下,准备三个订阅者代码,并启动这三个订阅者订阅topic1主题,然后启动生产者发布主题消息:
三个订阅者都收到了消息,activemq的控制台:
解释:发布者发布了1条消息,3个订阅者消费了三条消息!
Jeff.Star 认证博客专家 Java [努力支撑经历,经历支撑能力.][思路决定出路,细节决定成败.][聚焦,分享,转化,参与.][数据在流动,技术在流动,我们也要流动.]微信:TiensC