1.JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1.1 基本构件
连接工厂(ConnectionFactory):用于创建连接,在activemq中提供了集中连接工厂ActiveMQConnectionFactory。连接(Connection):JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。会话(Session):JMS Session是生产和消费消息的一个单线程上下文。用于创建消息生产者(MessageProvider),消息消费者(MessageConsumer)和消息(Message)。目的地(Destination):目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。消息生产者(MessageProvider):消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。消息消费者(MessageConsumer):消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息(Message):通信内容的载体,JMS消息由三部分组成:消息头,消息属性,消息体。
1.2 消息传递模型
1.2.1 点对点(point-to-point,简称PTP)
在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。
点对点消息模型有一些特性,如下:
每个消息只有一个接收者;消息发送者和接收者并没有时间依赖性;当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;当接收者收到消息的时候,会发送确认收到通知(acknowledgement)。
从图中可以看出,应用1和应用2向Queue中发送消息,这些消息根据到达的顺序排成一个队列,应用3和应用4进行消费,应用3消费了M5,那么应用4就消费不到M5了。发送的消息不确定能被哪个应用所消费,但是一个消息只能被一个应用所消费。
消息发送者和接受者没有时间依赖性,在消息发送到队列之后,只要有消费者,那么消息就被消费了。
1.2.2 发布/订阅(publish/subscribe,简称pub/sub)
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅topic。topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
发布/订阅消息模型特性如下:
一个消息可以传递给多个订阅者发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
从图中可以看出,应用1和应用2发送消息到topic,topic按照消息到达顺序将消息排列成一个队列,然后应用3和应用4消费的消息都是相同的。
发布者和订阅者有时间依赖性,例如应用4在应用2发送M5时没有运行。那么应用4将不会接收到M5。
当设置了可持久化的消息订阅,那么topic会将消息持久化到数据库,等待应用4上线之后发送给它。
1.2.3 持久化传输和持久化订阅
一开始我觉得这两个东西是差不多的。后来理解了一下发现是两种不同的东西。
传输模式(DeliveryMode):persistent and non-persistent
ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery)。默认情况下使用的是持久传输。
持久传输和非持久传输的区别就在于,消息发送之后会不会持久化到本地。使用持久化传输之后,消息在传输过程中会被存储到本地。在发生服务器宕机或者其他的事故时,消息能被保存下来。如果使用的是非持久化传输,那么消息只会在内存中,服务器死机或重启,内存中的消息就丢失了。
持久化订阅(DurableSubscriber)
持久化订阅是针对publisher/subscriber模型提出的。
通俗的讲,持久化订阅和非持久化订阅最大的区别就在于,消息会不会等着消费者来消费。举个例子:abc三个人(消费者)分(消费)一块蛋糕(topic中的消息)
持久化订阅:a迟到了,b和c先吃,留下一份给a,然后等a来了,a也吃的到同一份蛋糕。
非持久化订阅:a迟到了,b和c先吃了,不留给a,然后a来了,蛋糕吃完了。a只能等着吃下一份蛋糕了。
2.ActiveMQ
2.1 消息队列创建流程
JMS消息队列创建流程
创建ConnectionFactory工厂
通过工厂创建Connection连接通过连接创建一个连接回话Session通过Session创建消息的生产者MessageProducer和消息的消费者MessageConsumer;同时Session也创建一个消息Mesage。消息的生产者MessageProducer将该消息发送到目的地中Destination;消费者同时监听该消息目的地Destination。
2.1.1 PTP方式实例
public class Producer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "failover://"+"tcp://10.1.10.104:61616"; // 默认的连接地址
private static final int SENDNUM = 1000; // 发送的消息数量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(Producer.USERNAME,
Producer.PASSWORD, Producer.BROKEURL);
try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createQueue("FirstQueue1"); // 创建消息队列
messageProducer = session.createProducer(destination); // 创建消息生产者
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<Producer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}
public class Consumer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL= "failover://"+"tcp://10.1.10.104:61616"; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(Consumer.USERNAME, Consumer.PASSWORD, Consumer.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session Boolean.FALSE不需要加事务 Session.AUTO_ACKNOWLEDGE 确认方式
destination=session.createQueue("FirstQueue1"); // 创建连接的消息队列
messageConsumer=session.createConsumer(destination); // 创建消息消费者
while(true){ //输出接收到的消息
TextMessage textMessage=(TextMessage)messageConsumer.receive(1000);
if(textMessage!=null){
System.out.println("收到的消息:"+textMessage.getText());
}else{
break;
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2.2 基于pub/sub方式的实现
public class TopicProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL = "failover://"+"tcp://10.1.10.104:61616"; // 默认的连接地址
private static final int SENDNUM = 1000; // 发送的消息数量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者
// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(TopicProducer.USERNAME,
TopicProducer.PASSWORD, TopicProducer.BROKEURL);
try {
connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE); // 创建Session
destination = session.createTopic("FirstTopic1"); // 创建消息队列
messageProducer = session.createProducer(destination); // 创建消息生产者
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 设置持久化传输参数: NON_PERSISTENT = 1,PERSISTENT = 2;
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<TopicProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}
public class TopicConsumer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL= "failover://"+"tcp://10.1.10.104:61616"; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(TopicConsumer.USERNAME, TopicConsumer.PASSWORD, TopicConsumer.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session Boolean.FALSE不需要加事务
destination=session.createTopic("FirstTopic1"); // 创建连接的消息队列
messageConsumer=session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}