activemq 事务--遇到异常始终回滚,
1. 启用消息事务 <property name="sessionTransacted" value="true"/> 2. 当消息在消费的时候,如果用户程序抛出Exception,则消息会回滚重传(mq里面的未消费消息数目不变), spring 的activemq默认最多重传6次, 超过6次,即使抛出了异常,这个消费仍然被消费不可回滚。 jms.redeliveryPolicy.maximumRedeliveries=6(默认) 我们可以设置brokerURL里面的jms.redeliveryPolicy.maximumRedeliveries=-1 -1表示可以无限次重传,0表示不重传。 注意中间要用 html转移符 & (就是url查询字符串里的&字符)
3.消息队列回滚的触发--程序本身运行异常,比如数据库操作异常,如果自己用程序去检查数据库更新条数,如果小于1,程序可以人为编写代码,人为
抛出一个RuntimeException. 这两种情况都能触发消息的rollback
如在onMesssage()里,或者内部嵌套的方法里
throw new RuntimeException("Update DB failed.");
最终配置文件如下
appContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <import resource="mybatisContext.xml" /> <import resource="shardingContext.xml" /> <!-- 配置JMS连接工厂 --> <!-- 测试环境 --> <bean id="jmsFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://10.0.0.9:61616)?randomize=false&timeout=3000&initialReconnectDelay=100&jms.useAsyncSend=true&jms.redeliveryPolicy.maximumRedeliveries=-1" /> </bean> <!-- 配置JMS模版 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <!-- lets wrap in a pool to avoid creating a connection per send --> <bean class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory"> <ref local="jmsFactory" /> </property> </bean> </property> </bean> <!-- MQ监听者 --> <bean id="smsSendForMQListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory"> <ref bean="jmsFactory" /> </property> <property name="destinationName"> <value>DistributePics</value> </property> <property name="messageListener"> <ref bean="saveListener" /> </property> <!-- mq事务控制,如果抛异常了就回滚消息 --> <property name="sessionTransacted" value="true"/> </bean> <!-- Spring MVC --> <context:component-scan base-package="com.x.service"></context:component-scan> <context:component-scan base-package="com.x.controller.**"></context:component-scan> <context:component-scan base-package="com.x.listener"></context:component-scan> <context:component-scan base-package="com.x.spring"></context:component-scan> </beans>
java程序如下:
package com.x.imgapp.listener; import java.util.concurrent.Future; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.BytesMessage; import org.springframework.stereotype.Component; import com.x.imgapp.common.model.MsgBean; import com.x.imgapp.concurrent.AppExecutor; import com.x.imgapp.concurrent.MoveThread; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.util.ByteSequence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author frank.liu * */ @Component("saveListener") public class SaveDBListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(SaveDBListener.class); private static Long receiveCount = 0L; @Override public void onMessage(Message message) { logger.info("Step into onMessage()------------------------------->"); ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message; ByteSequence sequence = msg.getContent(); String msgStr = new String(sequence.data); // // Future<Integer> future = AppExecutor.getExecutor().submit(new MoveThread(msgStr)); logger.info("receiveCount:{}", ++receiveCount); throw new RuntimeException("Update DB failed."); } }