Jms基础知识整理

xiaoxiao2026-05-09  11

开始文章之前先澄清几个概念 什么是消息 消息是一个用于在组件和应用程序之间通讯的的方法。消息之间的传递是点对点的。任何终端之间都可以相互接受和发送消息。并且每个终端都必须遵守如下的规则 -> 创建消息 -> 发送消息 -> 接收消息 -> 读取消息 为什么要使用消息 理由很简单,消息是一个分布式的低耦合通讯方案。A发送一个消息到一个agent ,B作为接受者去agent上获取消息。但是A,B不需要同时到agent上去注册。agent作为一个中转为A,B提供搞效率的通讯服务。开发者的关注点 走到这里,我也不想去解释jms spec上那些抽象且复杂的概念了,说的很白,1年多了我自己也没弄懂是个什么东西,也没时间从头到尾去仔细的看,同时我认为没必要,我所关注的是如何让jms跑起来,并且工作正常,所以spec只是个字典,当我需要用的时候才去查。开发者的jms环境 遵守简单明了的原则,所谓jms环境只是2个对象1> ConnectionFactory 2> Destination 通常Provider会提供JNDI的对象获取,具体方法可以去Privider的网站上搜索jndi support下面我以jbossMq为介质跑一个简单的jms,为了保证jms的本质清晰,我没有使用jbossMq的Api,而是直接调用的jms Api.

java 代码   package  com.javaeye.jms.jboss;      import  javax.jms.Connection;   import  javax.jms.ConnectionFactory;   import  javax.jms.Destination;   import  javax.jms.JMSException;   import  javax.jms.MessageConsumer;   import  javax.jms.MessageProducer;   import  javax.jms.Queue;   import  javax.jms.QueueSender;   import  javax.jms.Session;   import  javax.jms.TextMessage;   import  javax.naming.Context;   import  javax.naming.InitialContext;   import  javax.naming.NamingException;      public   class  JbossNativeJmsImpl {             /**        * @author zuly        *        * following jms ptp domain, use an simple text message to test        *        * A jms ptp sender will following the steps below!        *     1> get an ConnectionFactory use JNDI Lookup Or Initial it yourself        *     2> use this ConnectionFactory to start a jms connection      *        [spec to jms 1.1 api to get the main idea of it ]        *     3> use connection to create a jms session        *     4> get a queue destination / messege agent        *     5> start the Producer[jms1.1 spec] by a session        *     6> get messege Object or initial it yourself by implements the messegeor       *        it's sub interfaces        *     7> call sender or send it selfing        *     8> finallized the connection object or it will throw a warning to you!        *        * @param messege        * @throws NamingException        * @throws JMSException        */        public   void  sendingProcessing(String messege)  throws  NamingException, JMSException{           Context ctx = new  InitialContext();           ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA" );           Connection conn = cf.createConnection();           Session session = conn.createSession(false , Session.AUTO_ACKNOWLEDGE);           Destination dest = (Queue) ctx.lookup("queue/A" );           MessageProducer msgp = session.createProducer(dest);           QueueSender sender = (QueueSender) msgp;           TextMessage msg = session.createTextMessage();           msg.setText(messege);           sender.send(msg);           conn.close();       }                         /**        * @author zuly        *        * following jms ptp domain, use an simple text message to test        *        * A jms ptp retriver will following the steps below!        *     1> get an ConnectionFactory use JNDI Lookup Or Initial it yourself        *     2> use this ConnectionFactory to start a jms connection       *        [spec to jms 1.1 api to get the main idea of it ]        *     3> use connection to create a jms session        *     4> get a queue destination / messege agent        *     5> retrive a consumer from session        *     6> start the jms connection to retrivte the message        *     7> get message from consumer        *         * @return textMessege        * @throws NamingException        * @throws JMSException        */        public  String retriveingProcessing()  throws  NamingException, JMSException{           Context ctx = new  InitialContext();           ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA" );           Connection conn = cf.createConnection();           Session session = conn.createSession(false , Session.AUTO_ACKNOWLEDGE);           Destination dest = (Queue) ctx.lookup("queue/A" );           MessageConsumer msgconsumer = session.createConsumer(dest);           //MessageListener ml = new JmsListenner();            //msgconsumer.setMessageListener(ml);            conn.start();           TextMessage msg = (TextMessage) msgconsumer.receive();           conn.close();           System.out.println("messege is"  + msg.getText());           return  msg.getText();       }   }  

<point to="" point=""></point> <point to="" point=""></point> 注意retrive函数中comment的掉的两行,消息Listener的作用是实现异步通讯,但是它有一个约定,必须和发送者 保持物理上的分离,针对于jboss而言,就要求这个Listener必须跑在容器外面。这是一个很搞的问题,每天Jms的邮件列表里面都有无数的这样的 问题发过来。但是回复的人很少。我自己也从来不回复。 其实我也不清楚写这篇文章到底是出于什么目的,怕只是让这么一个简单的问题有一个回答而已。 把下面这个程序跑起来就可以异步接受消息了。

java 代码   package  com.javaeye.jms.jboss;      import  java.util.Properties;      import  javax.jms.Connection;   import  javax.jms.ConnectionFactory;   import  javax.jms.Destination;   import  javax.jms.JMSException;   import  javax.jms.MessageConsumer;   import  javax.jms.MessageListener;   import  javax.jms.Session;   import  javax.naming.Context;   import  javax.naming.InitialContext;   import  javax.naming.NamingException;      import  com.javaeye.spring.services.jms.mdp.JmsListenner;      public   class  JbossJmsAsync {          /**        * @param args        * @throws NamingException         * @throws JMSException         */        public   static   void  main(String[] args)  throws  NamingException, JMSException {           Properties pops = new  Properties();           pops.setProperty("jboss.bind.address""0.0.0.0" );           pops.setProperty("java.naming.factory.initial""org.jnp.interfaces.NamingContextFactory" );           pops.setProperty("java.naming.factory.url.pkgs""org.jboss.naming:org.jnp.interfaces" );           pops.setProperty("java.naming.provider.url""localhost" );           Context ctx = new  InitialContext(pops);           ConnectionFactory cf = (ConnectionFactory) ctx.lookup("ConnectionFactory" );           Connection conn = cf.createConnection();           Session session = conn.createSession(false , Session.AUTO_ACKNOWLEDGE);           Destination dest = (Destination) ctx.lookup("queue/A" );           MessageConsumer msgConsumer = session.createConsumer(dest);           MessageListener ml = new  JmsListenner();           msgConsumer.setMessageListener(ml);            conn.start();       }      }  

javaeye的主题好像是spring,为了迎合领导,下面我把这套东西跑在spring里面。同时我发现spring对jms的包装真的简单,而且还提供了一个模版,虽然这个模版的接口是在是很罗唆。 ps:今天是第1次用spring在reference里找了半天找不到方法注入的办法,于是google了一个注入办法,不合理的地方请大家指出。首先我通过方法来注入ConnectionFactory和Destination这两个对象来支撑jms环境

java 代码   package  com.javaeye.spring.services.jms.mdp;      import  java.util.Properties;      import  javax.jms.ConnectionFactory;   import  javax.jms.Destination;   import  javax.jms.Queue;   import  javax.naming.Context;   import  javax.naming.InitialContext;   import  javax.naming.NamingException;      public   class  UserJmsTransactionUtil {          private  String connectionFactoryJndiLookUp;             private  String destinationJndiLookUp;             private  String localConnectionFactoryJndiLookUp;             private  String containerType;                   public  String getConnectionFactoryJndiLookUp() {           return  connectionFactoryJndiLookUp;       }                public   void  setConnectionFactoryJndiLookUp(String connectionFactoryJndiLookUp) {           this .connectionFactoryJndiLookUp = connectionFactoryJndiLookUp;       }                public  String getDestinationJndiLookUp() {           return  destinationJndiLookUp;       }                public   void  setDestinationJndiLookUp(String destinationJndiLookUp) {           this .destinationJndiLookUp = destinationJndiLookUp;       }                public  ConnectionFactory getConnectionFactory()  throws  NamingException{           Context ctx = new  InitialContext();           ConnectionFactory cf = (ConnectionFactory) ctx.lookup(connectionFactoryJndiLookUp);           return  cf;       }                   public  Destination getJmsDestination()  throws  NamingException{           Context ctx = new  InitialContext();           Destination dest = (Queue) ctx.lookup(destinationJndiLookUp);           return  dest;       }                   public  ConnectionFactory getQueueConnectionFactory()  throws  NamingException{           Properties pops = new  Properties();           pops.setProperty("jboss.bind.address""0.0.0.0" );           pops.setProperty("java.naming.factory.initial""org.jnp.interfaces.NamingContextFactory" );           pops.setProperty("java.naming.factory.url.pkgs""org.jboss.naming:org.jnp.interfaces" );           pops.setProperty("java.naming.provider.url""localhost" );           Context ctx = new  InitialContext(pops);           ConnectionFactory cf = (ConnectionFactory) ctx.lookup(localConnectionFactoryJndiLookUp);           return  cf;       }                   public  Destination getLocalJmsDestination()  throws  NamingException{           Properties pops = new  Properties();           pops.setProperty("jboss.bind.address""0.0.0.0" );           pops.setProperty("java.naming.factory.initial""org.jnp.interfaces.NamingContextFactory" );           pops.setProperty("java.naming.factory.url.pkgs""org.jboss.naming:org.jnp.interfaces" );           pops.setProperty("java.naming.provider.url""localhost" );           Context ctx = new  InitialContext(pops);           Destination dest = (Destination) ctx.lookup(destinationJndiLookUp);           return  dest;       }                public  String getLocalConnectionFactoryJndiLookUp() {           return  localConnectionFactoryJndiLookUp;       }                public   void  setLocalConnectionFactoryJndiLookUp(               String localConnectionFactoryJndiLookUp) {           this .localConnectionFactoryJndiLookUp = localConnectionFactoryJndiLookUp;       }      }

发送端的配置如下

xml 代码   < beans >        < bean   id = "userJmsUtil"   class = "com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil" >            < property   name = "connectionFactoryJndiLookUp"   value = "java:JmsXA" > <!-- -->property >            < property   name = "destinationJndiLookUp"   value = "queue/A" > <!-- -->property >            < property   name = "localConnectionFactoryJndiLookUp"   value = "ConnectionFactory" > <!-- -->property >        <!-- -->bean >           < bean   id = "connectionFactory"   class = "org.springframework.beans.factory.config.MethodInvokingFactoryBean" >            < property   name = "targetObject"   ref = "userJmsUtil" > <!-- -->property >            < property   name = "targetMethod"   value = "getConnectionFactory" > <!-- -->property >        <!-- -->bean >               < bean   id = "queue"   class = "org.springframework.beans.factory.config.MethodInvokingFactoryBean" >            < property   name = "targetObject"   ref = "userJmsUtil" > <!-- -->property >            < property   name = "targetMethod"   value = "getJmsDestination" > <!-- -->property >        <!-- -->bean >                   < bean   id = "jmsQueue"   class = "org.springframework.jms.core.JmsTemplate" >            < property   name = "connectionFactory"   ref = "connectionFactory" > <!-- -->property >            < property   name = "defaultDestination"   ref = "queue" > <!-- -->property >            < property   name = "messageConverter" >                < bean   class = "org.springframework.jms.support.converter.SimpleMessageConverter" > <!-- -->bean >            <!-- -->property >        <!-- -->bean >    <!-- -->beans >   

ps:javaeye的模版工具bug还真多,不管了. 如果使用Listenner的化,一样需要遵守发送者和接收者物理隔离的原则,我的做法是把发送者配到一个xml中,在把接受者配到另外一个xml中去,发送的配置绑定到容器里,接收者的跑在本地.否则spring初始化是过不去的. 下面这个程序是发送消息的程序.使用了spring的模版,发条消息比new个对象还简单.同时spring还提供了适配器的接口,一样通过声明式的配 置,这样可以在同一个接口里发送各种类型的消息了.同时支持事务,我还不知道这个有什么用呵呵,第1次使用嘛!但是就使用上来说,spring是最简单 的.2者都只需要注入一个对象而已.

java 代码   @Test   public   void  send(){       ApplicationContext ac = new  FileSystemXmlApplicationContext( "jms.xml" );       BeanFactory bf = ac;       JmsTemplate jt = (JmsTemplate) bf.getBean("jmsQueue" );       jt.convertAndSend("2132134" );   }  

接收端的配置如下

xml 代码   xml   version = "1.0"   encoding = "UTF-8" ?>    >    < beans >           < bean   id = "listenner"   class = "com.javaeye.spring.services.jms.mdp.JmsListenner" > <!-- -->bean >               < bean   id = "userJmsUtil"   class = "com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil" >            < property   name = "connectionFactoryJndiLookUp"   value = "java:JmsXA" > <!-- -->property >            < property   name = "destinationJndiLookUp"   value = "queue/A" > <!-- -->property >            < property   name = "localConnectionFactoryJndiLookUp"   value = "ConnectionFactory" > <!-- -->property >        <!-- -->bean >           < bean   id = "localConnectionFactory"   class = "org.springframework.beans.factory.config.MethodInvokingFactoryBean" >            < property   name = "targetObject"   ref = "userJmsUtil" > <!-- -->property >            < property   name = "targetMethod"   value = "getQueueConnectionFactory" > <!-- -->property >        <!-- -->bean >               < bean   id = "localDestination"   class = "org.springframework.beans.factory.config.MethodInvokingFactoryBean" >            < property   name = "targetObject"   ref = "userJmsUtil" > <!-- -->property >            < property   name = "targetMethod"   value = "getLocalJmsDestination" > <!-- -->property >        <!-- -->bean >               < bean   id = "listenerContainer"   class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >            < property   name = "concurrentConsumers"   value = "5" > <!-- -->property >            < property   name = "connectionFactory"   ref = "localConnectionFactory" > <!-- -->property >            < property   name = "destination"   ref = "localDestination" > <!-- -->property >            < property   name = "messageListener"   ref = "listenner" > <!-- -->property >        <!-- -->bean >    <!-- -->beans >   

接收端由于需要从jbossmq里取ConnectionFactory和Destination,所以,我调用的是userJmsUtil的localLookup.这个函数的作用等同于发送者的那个函数,只不过前者是容器外获取,而后者是容器内的而已.

java 代码   package  com.javaeye.spring.services.jms.mdp;      import  javax.jms.JMSException;   import  javax.jms.Message;   import  javax.jms.MessageListener;   import  javax.jms.TextMessage;      public   class  JmsListenner  implements  MessageListener {          public   void  onMessage(Message message) {           try  {               TextMessage msg = (TextMessage) message;               System.out.println(msg.getText());           } catch  (JMSException e) { e.printStackTrace(); }       }      } 

spring对jms的整合里提到了一个jms provider ActiveMQ,要用一个开源框架要做的第一件事就是先跑一个demo起来,同样,我们要做的事还是获取ConnectionFactory和 Destination对象,还好,ActiveMQ的JNDI实现比jbossMQ还要简单,直接通过一个本地的Context就可以查到了,具体的可 以参照ActiveMQ官方的支持文档.

转载请注明原文地址: https://www.6miu.com/read-5048465.html

最新回复(0)