本文主要介绍SpringBoot整合阿里云消息队列的使用
1.Maven依赖
<parent>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-parent
</artifactId>
<version>1.5.10.RELEASE
</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8
</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8
</project.reporting.outputEncoding>
<ons-client.version>1.7.8.Final
</ons-client.version>
<commons-lang3.version>3.4
</commons-lang3.version>
<hutool-all.version>3.0.9
</hutool-all.version>
<fastjson.version>1.2.47
</fastjson.version>
<java.version>1.8
</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.openservices
</groupId>
<artifactId>ons-client
</artifactId>
<version>${ons-client.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.commons
</groupId>
<artifactId>commons-lang3
</artifactId>
<version>${commons-lang3.version}
</version>
</dependency>
<dependency>
<groupId>com.xiaoleilu
</groupId>
<artifactId>hutool-all
</artifactId>
<version>${hutool-all.version}
</version>
</dependency>
<dependency>
<groupId>com.alibaba
</groupId>
<artifactId>fastjson
</artifactId>
<version>${fastjson.version}
</version>
</dependency>
</dependencies>
2.配置文件
server
.port
=9010
#阿里云 Access Key
aliyun
.mq
.accesskey
=xxxxxx
##阿里云 Access Key Secret
aliyun
.mq
.secretkey
=xxxxxx
#消息队列信息
aliyun
.mq
.normal
.topic
=My_Mq_Test_Topic_01
aliyun
.mq
.normal
.tag
=TagA
aliyun
.mq
.normal
.producerId
=PID_My_Mq_test_Producer_01
aliyun
.mq
.normal
.consumerId
=CID_My_Mq_Test_Topic_01
aliyun
.mq
.normal
.keyPrefix
=Mq_Test_01_
aliyun
.mq
.broadcast
.topic
=My_Mq_Test_Topic_02
aliyun
.mq
.broadcast
.tag
=TagA
aliyun
.mq
.broadcast
.producerId
=PID_My_Mq_test_Producer_02
aliyun
.mq
.broadcast
.consumerId
=CID_My_Mq_Test_Topic_02
aliyun
.mq
.broadcast
.keyPrefix
=Mq_Test_02_
#日志
logging
.level
.root
=WARN
logging
.level
.com
.boot
.aliware
.mq
=DEBUG
3.参数配置类
3.1阿里云账户信息
@Configuration
@ConfigurationProperties(prefix
= "aliyun.mq")
public class AliyunAccountConfig {
private String accesskey
;
private String secretkey
;
}
3.2 消息队列属性基础 properties
public class MqBaseProperties {
private String topic
;
private String tag
;
private String producerId
;
private String consumerId
;
private String keyPrefix
;
}
3.3普通消息队列属性配置类
@Configuration
@ConfigurationProperties(prefix
= "aliyun.mq.normal")
public class MqNormalParamConfig extends MqBaseProperties {
}
3.4广播消息队列属性配置类
@Configuration
@ConfigurationProperties(prefix
= "aliyun.mq.broadcast")
public class MqBroadcastParamConfig extends MqBaseProperties {
}
4.Mq配置
4.1 公共基础抽象配置
package com
.boot
.aliware
.mq
.config
;
import com
.aliyun
.openservices
.ons
.api
.MessageListener
;
import com
.aliyun
.openservices
.ons
.api
.PropertyKeyConst
;
import com
.aliyun
.openservices
.ons
.api
.PropertyValueConst
;
import com
.aliyun
.openservices
.ons
.api
.bean
.ConsumerBean
;
import com
.aliyun
.openservices
.ons
.api
.bean
.ProducerBean
;
import com
.aliyun
.openservices
.ons
.api
.bean
.Subscription
;
import com
.boot
.aliware
.mq
.config
.param
.AliyunAccountConfig
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.util
.ObjectUtils
;
import java
.util
.*
;
public class MqBaseConfig {
private static final Logger LOGGER
= LoggerFactory
.getLogger(MqBaseConfig
.class);
private final String STAR_FLOWER
= "*";
@Autowired
private AliyunAccountConfig mqParamProperties
;
protected ProducerBean
createProducer(String producerId
) {
ProducerBean producerBean
= new ProducerBean();
Properties properties
= this.createProducerProperties(producerId
);
producerBean
.setProperties(properties
);
LOGGER
.info("创建生产者参数 producerId={}}", producerId
);
return producerBean
;
}
protected ConsumerBean
createConsumer(String consumerId
, String consumeThreadNum
,
Map
<Subscription, MessageListener> subscriptionTable
) {
ConsumerBean consumerBean
= new ConsumerBean();
Properties properties
= this.createConsumerProperties(consumerId
, consumeThreadNum
);
consumerBean
.setProperties(properties
);
consumerBean
.setSubscriptionTable(subscriptionTable
);
LOGGER
.info("创建消费者参数 consumerId={}", consumerId
);
return consumerBean
;
}
protected ConsumerBean
createBbRoadCastConsumer(String consumerId
, String consumeThreadNum
,
Map
<Subscription, MessageListener> subscriptionTable
) {
ConsumerBean consumerBean
= new ConsumerBean();
Properties properties
= this.createConsumerProperties(consumerId
, consumeThreadNum
, true);
consumerBean
.setProperties(properties
);
consumerBean
.setSubscriptionTable(subscriptionTable
);
LOGGER
.info("创建消费者参数 consumerId={}", consumerId
);
return consumerBean
;
}
private Properties
createConsumerProperties(String consumerId
, String consumeThreadNum
) {
return this.createConsumerProperties(consumerId
, consumeThreadNum
, false);
}
private Properties
createConsumerProperties(String consumerId
, String consumeThreadNum
, boolean isBbRoadCast
) {
Properties properties
= this.buildBaseProperties();
properties
.setProperty(PropertyKeyConst
.ConsumerId
, consumerId
);
properties
.setProperty(PropertyKeyConst
.ConsumeThreadNums
, consumeThreadNum
);
if (isBbRoadCast
) {
LOGGER
.info("广播模式消费者 consumerId={}", consumerId
);
properties
.put(PropertyKeyConst
.MessageModel
, PropertyValueConst
.BROADCASTING
);
}
return properties
;
}
private Properties
createProducerProperties(String producerId
) {
Properties properties
= this.buildBaseProperties();
properties
.setProperty(PropertyKeyConst
.ProducerId
, producerId
);
return properties
;
}
protected Map
<Subscription, MessageListener> createSubscriptionTable(String topic
, MessageListener listeners
) {
return this.createSubscriptionTable(topic
, STAR_FLOWER
, listeners
);
}
protected Map
<Subscription, MessageListener> createSubscriptionTable(String topic
,
String tag
, MessageListener listeners
) {
Subscription subscription
= new Subscription();
subscription
.setTopic(topic
);
subscription
.setExpression(tag
);
LOGGER
.info("消费者创建 参数 subscription={}", subscription
);
Map
<Subscription, MessageListener> subscriptionTable
= new HashMap<>();
if (!ObjectUtils
.isEmpty(listeners
)) {
subscriptionTable
.put(subscription
, listeners
);
}
LOGGER
.info("消费者创建 参数 subscriptionTableSize={}", subscriptionTable
.size());
return subscriptionTable
;
}
private Properties
buildBaseProperties() {
Properties properties
= new Properties();
properties
.setProperty(PropertyKeyConst
.AccessKey
, mqParamProperties
.getAccesskey());
properties
.setProperty(PropertyKeyConst
.SecretKey
, mqParamProperties
.getSecretkey());
return properties
;
}
}
4.2 生产者配置
package com
.boot
.aliware
.mq
.config
;
import com
.aliyun
.openservices
.ons
.api
.bean
.ProducerBean
;
import com
.boot
.aliware
.mq
.config
.param
.MqBroadcastParamConfig
;
import com
.boot
.aliware
.mq
.config
.param
.MqNormalParamConfig
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
@Configuration
public class MqProducerConfig extends MqBaseConfig {
private static final Logger LOGGER
= LoggerFactory
.getLogger(MqProducerConfig
.class);
@Autowired
private MqNormalParamConfig normalParamConfig
;
@Autowired
private MqBroadcastParamConfig broadcastParamConfig
;
@Bean(name
= "normalProducer", initMethod
= "start", destroyMethod
= "shutdown")
public ProducerBean
normalProducer() {
ProducerBean producerBean
= this.createProducer(normalParamConfig
.getProducerId());
LOGGER
.info("{} 生产者创建完毕", "normalProducer");
return producerBean
;
}
@Bean(name
= "broadcastProducer", initMethod
= "start", destroyMethod
= "shutdown")
public ProducerBean
broadcastProducer() {
ProducerBean producerBean
= this.createProducer(broadcastParamConfig
.getProducerId());
LOGGER
.info("{} 生产者创建完毕", "broadcastProducer");
return producerBean
;
}
}
4.3 消费者配置
package com
.boot
.aliware
.mq
.config
;
import com
.aliyun
.openservices
.ons
.api
.MessageListener
;
import com
.aliyun
.openservices
.ons
.api
.bean
.ConsumerBean
;
import com
.aliyun
.openservices
.ons
.api
.bean
.Subscription
;
import com
.boot
.aliware
.mq
.config
.param
.MqBroadcastParamConfig
;
import com
.boot
.aliware
.mq
.config
.param
.MqNormalParamConfig
;
import com
.boot
.aliware
.mq
.listener
.BroadcastMessageListener
;
import com
.boot
.aliware
.mq
.listener
.NormalMessageListener
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.context
.annotation
.Bean
;
import org
.springframework
.context
.annotation
.Configuration
;
import java
.util
.Map
;
@Configuration
public class MqConsumerConfig extends MqBaseConfig {
private static final Logger LOGGER
= LoggerFactory
.getLogger(MqConsumerConfig
.class);
@Autowired
private MqNormalParamConfig normalParamConfig
;
@Autowired
private MqBroadcastParamConfig broadcastParamConfig
;
@Bean(name
= "normalConsumer01", initMethod
= "start", destroyMethod
= "shutdown")
public ConsumerBean
normalConsumerBer() {
LOGGER
.info("{} 消费者创建开始", "normalConsumer");
String consumeThreadNum
= "1";
Map
<Subscription, MessageListener> subscriptionTable
=
this.createSubscriptionTable(normalParamConfig
.getTopic(), new NormalMessageListener());
ConsumerBean consumerBean
=
this.createConsumer(normalParamConfig
.getConsumerId(), consumeThreadNum
, subscriptionTable
);
consumerBean
.setSubscriptionTable(subscriptionTable
);
LOGGER
.info("{} 消费者创建完毕", "normalConsumer");
return consumerBean
;
}
@Bean(name
= "broadcastConsumer", initMethod
= "start", destroyMethod
= "shutdown")
public ConsumerBean
broadcastConsumer() {
LOGGER
.info("{} 广播订阅消费者创建开始", "broadcastConsumer");
String consumeThreadNum
= "3";
Map
<Subscription, MessageListener> subscriptionTable
=
this.createSubscriptionTable(broadcastParamConfig
.getTopic(),
new BroadcastMessageListener());
LOGGER
.info("广播订阅消费者 size={}", subscriptionTable
.size());
ConsumerBean consumerBean
=
this.createBbRoadCastConsumer(broadcastParamConfig
.getConsumerId(), consumeThreadNum
, subscriptionTable
);
consumerBean
.setSubscriptionTable(subscriptionTable
);
LOGGER
.info("{} 广播订阅消费者创建完毕", "broadcastConsumer");
return consumerBean
;
}
}
5.消息发送
5.1 消息发送抽象父类接口
package com
.boot
.aliware
.mq
.service
.common
;
import com
.aliyun
.openservices
.ons
.api
.*
;
import com
.aliyun
.openservices
.ons
.api
.exception
.ONSClientException
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
public class MqSendAbstractService {
private Logger LOGGER
= LoggerFactory
.getLogger(MqSendAbstractService
.class);
public boolean send(Message msg
, Producer currentProducer
) {
try {
SendResult sendResult
= currentProducer
.send(msg
);
assert sendResult
!= null
;
LOGGER
.info("列发送消息成功 sendResult={}", sendResult
);
return true;
} catch (ONSClientException e
) {
System
.out
.println("发送失败");
LOGGER
.info("消息发送失败 ", e
);
return false;
}
}
5.2消息发送接口实现
package com
.boot
.aliware
.mq
.service
.impl
;
import com
.aliyun
.openservices
.ons
.api
.Message
;
import com
.aliyun
.openservices
.ons
.api
.Producer
;
import com
.boot
.aliware
.mq
.config
.param
.MqBroadcastParamConfig
;
import com
.boot
.aliware
.mq
.config
.param
.MqNormalParamConfig
;
import com
.boot
.aliware
.mq
.service
.common
.MqSendAbstractService
;
import com
.boot
.aliware
.mq
.service
.MqSendService
;
import com
.boot
.aliware
.mq
.util
.DateUtil
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.beans
.factory
.annotation
.Qualifier
;
import org
.springframework
.stereotype
.Service
;
import java
.util
.Date
;
@Service
public class MqSendServiceImpl extends MqSendAbstractService implements MqSendService {
private Logger LOGGER
= LoggerFactory
.getLogger(MqSendServiceImpl
.class);
@Autowired
@Qualifier("normalProducer")
private Producer normalProducer
;
@Autowired
@Qualifier("broadcastProducer")
private Producer broadcastProducer
;
@Autowired
private MqNormalParamConfig normalParamConfig
;
@Autowired
private MqBroadcastParamConfig broadcastParamConfig
;
@Override
public void sendNormalMess(String mess
) {
LOGGER
.info("发送普通消息开始");
Message msg
= new Message(normalParamConfig
.getTopic(),
normalParamConfig
.getTag(),
mess
.getBytes());
LOGGER
.info("普通消息 msg={}", msg
);
msg
.setKey(normalParamConfig
.getKeyPrefix().concat(mess
));
this.send(msg
, normalProducer
);
}
@Override
public void sendBroadcastMess(String mess
) {
LOGGER
.info("发送广播消息开始");
Message msg
= new Message(broadcastParamConfig
.getTopic(),
broadcastParamConfig
.getTag(),
mess
.getBytes());
LOGGER
.info("广播消息 msg={}", msg
);
msg
.setKey(broadcastParamConfig
.getKeyPrefix().concat(mess
));
this.sendAsync(msg
, broadcastProducer
);
}
@Override
public void sendDelayMess(String mess
) {
Message msg
= new Message(normalParamConfig
.getTopic(),
normalParamConfig
.getTag(),
mess
.getBytes());
LOGGER
.info("发送延时消息开始");
msg
.setKey(normalParamConfig
.getKeyPrefix().concat(mess
));
long time
= DateUtil
.offsetMinute(new Date(), 1).getTime();
msg
.setStartDeliverTime(time
);
LOGGER
.info("延时消息 msg={}", msg
);
this.send(msg
, normalProducer
);
}
}
6.消息监听
6.1消息公共抽象父类监听
package com
.boot
.aliware
.mq
.listener
;
import com
.aliyun
.openservices
.ons
.api
.Message
;
import org
.apache
.commons
.lang3
.exception
.ExceptionUtils
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import java
.io
.UnsupportedEncodingException
;
public class MqBaseListener {
public static final Logger LOGGER
= LoggerFactory
.getLogger(MqBaseListener
.class);
private Integer FIVE_TIMES
= 5;
public String
getStringMess(Message message
) {
String msg
= null
;
try {
msg
= new String(message
.getBody(), "utf-8");
} catch (UnsupportedEncodingException e
) {
String stack
= ExceptionUtils
.getMessage(e
);
LOGGER
.info("消息监听->[获取消息体异常] stack={}", stack
);
}
return msg
;
}
public Boolean
canRetryFiveTimes(int runTime
) {
return this.canRetryTimes(runTime
, FIVE_TIMES
);
}
public Boolean
canRetryTimes(int runTime
, int retryTimes
) {
if (runTime
< retryTimes
) {
return true;
}
return false;
}
}
6.2 普通消息队列监听示例
package com
.boot
.aliware
.mq
.listener
;
import com
.aliyun
.openservices
.ons
.api
.Action
;
import com
.aliyun
.openservices
.ons
.api
.ConsumeContext
;
import com
.aliyun
.openservices
.ons
.api
.Message
;
import com
.aliyun
.openservices
.ons
.api
.MessageListener
;
import com
.boot
.aliware
.mq
.util
.DateUtil
;
import org
.apache
.commons
.lang3
.exception
.ExceptionUtils
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
public class NormalMessageListener extends MqBaseListener implements MessageListener {
public static final Logger LOGGER
= LoggerFactory
.getLogger(NormalMessageListener
.class);
@Override
public Action
consume(Message message
, ConsumeContext consumeContext
) {
LOGGER
.info("进入普通消息队列监听 ");
LOGGER
.info("消息 id={},执行Host={}", message
.getMsgID(), message
.getBornHost());
LOGGER
.info("消息 Topic={},Tag={}", message
.getTopic(), message
.getTag());
LOGGER
.info("消息生成时间={}", DateUtil
.formatTimeStamp(message
.getBornTimestamp()));
LOGGER
.info("消息执行次数={}", message
.getReconsumeTimes());
String srtMsg
= this.getStringMess(message
);
if (null
== srtMsg
) {
return Action
.ReconsumeLater
;
}
boolean successFlg
= true;
try {
LOGGER
.info("此处模拟消息处理代码");
} catch (Exception e
) {
successFlg
= false;
String stack
= ExceptionUtils
.getMessage(e
);
LOGGER
.info("用户阅读分享记录消息->[消费处理异常] {}", stack
);
}
boolean canRetry
= this.canRetryFiveTimes(message
.getReconsumeTimes());
if (!successFlg
&& canRetry
) {
return Action
.ReconsumeLater
;
}
LOGGER
.debug("消息处理成功");
return Action
.CommitMessage
;
}
}
7. 消息生产测试用Controller
package com
.boot
.aliware
.mq
.controller
;
import com
.boot
.aliware
.mq
.service
.MqSendService
;
import org
.slf4j
.Logger
;
import org
.slf4j
.LoggerFactory
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.web
.bind
.annotation
.RequestMapping
;
import org
.springframework
.web
.bind
.annotation
.RequestParam
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
@RestController
public class MessageProductController {
private Logger LOGGER
= LoggerFactory
.getLogger(MessageProductController
.class);
@Autowired
private MqSendService mqSendService
;
@RequestMapping("/send/normal/mess")
public String
sendNormalMess(@RequestParam("mess") String mess
) {
mqSendService
.sendNormalMess(mess
);
return "success";
}
@RequestMapping("/send/broadcast/mess")
public String
sendBroadcastMess(@RequestParam("mess") String mess
) {
mqSendService
.sendBroadcastMess(mess
);
return "success";
}
@RequestMapping("/send/many-normal/mess")
public String
sendManyNormalMess(@RequestParam("mess") String mess
) {
LOGGER
.info("批量发送消息测试开始了 ");
for (int i
= 0; i
< 10; i
++) {
mqSendService
.sendNormalMess(mess
.concat(String
.valueOf(i
)));
}
LOGGER
.info("批量发送消息测试完毕了 ");
return "success";
}
@RequestMapping("/send/delay/mess")
public String
sendDelayMess(@RequestParam("mess") String mess
) {
mqSendService
.sendDelayMess(mess
);
return "success";
}
}