在前一段时间线上出现过一次事故,一个非常重要的消息生产者服务,由于MQ出现问题,消息大量积压,导致了该服务线程被打满,外部请求返回502,服务采用Springboot搭建,使用Springboot的Tomcat容器。
消息的生产者服务是一个高并发量的服务,接受外部方的接口调用,并将消息推送至MQ,调用流程示意图如下:
而事故当天的情况是,MQ消息大量积压,基本等同于MQ挂掉,
大量的请求积压在推送消息到MQ的地方,导致外部的大量的请求在Tomcat的线程池积压,当Tomcat的线程池全部被打满后,服务不能再接受新的请求进入,导致抛出大量的502错误。
首先可以想到的是,在MQ发送消息处,设置推送消息的超时时间,超过超时时间,认为消息发送失败,将消息写入文件中,当时这个方案并没有根本上解决,如果MQ挂掉,Tomcat不被打满的问题,虽然可以解决目前的生产场景的情况,但是当后续请求量更大时候,不能保证Tomcat不被打满,同时,消息推送的超时时间的设定也不好进行把握,如果由于网络波动或其他情况,导致消息推送慢,但是是可以推送成功的,但是万一超过了超时时间,消息直接不会发送,反而会影响目前的业务逻辑。
事故的起因是因为Tomcat被打满,那调整Tomcat的线程池大小,调整大一些不就可以了吗?但是这是一种治标不治本的方法,并没有根本上解决MQ挂掉后,Tomcat被打满的情况,只能是延迟了被打满的时间,但是根据目前线上机器配置的情况,Tomcat增大线程池大小并不是一个合适的选择。
问题的根本是MQ挂掉,主线程全部卡在MQ发送消息的部分,那么是否可以考虑,将MQ发送消息的这个操作异步化,让Tomcat主线程不在此等待,而是转由异步线程执行发送消息的操作?这个方案看来还是比较靠谱的,这里我首先考虑引入线程池,进行异步化处理,
OK,方案确认,那么线程池的参数设置需要进行考虑,一般常规的线程池线程数设置为:CPU core * 2 +1,也有其他的线程池估算算法:估算线程池数目大小 ,这里我采用传统的设置方式,初始化线程池核心数为 CPU core * 2 +1,最大线程数:4 * (CPU core * 2 +1),阻塞队列:1000。 由于我们采用了线程池,那么对于线程池的监控是必须的,这里我设置为线程数达到最大线程数的80%会进行告警,因为这时候说明MQ推送消息可能已经出现堆积的情况了,下面给出代码的实例:
消息推送异步化:
@Component public class MessageProducer InitializingBean { @Autowired private MqService mqService; private static ThreadPoolExecutor pool = null; @Override public void sendToMessageBus(String message) { //线程池异步处理 try { pool.execute(() -> { try { //推送消息 mqService.send(message); monitorThreadPool(); } catch (Exception e) { log.error("send message to message bus error, cause : {}", e); handleFailMessage(message); } }); } catch (Exception e) { log.error("commit send message to thread pool error, prepare to save message in file......"); handleFailMessage(message); } } @Override public void afterPropertiesSet() throws Exception { pool = new ThreadPoolExecutor(5, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.AbortPolicy()); //启动消息失败处理线程 FailMessageHandlerProducer republishThread = new FailMessageHandlerProducer(); republishThread.setName("republish-thread"); republishThread.start(); } public static void shutdown() { if (pool != null) { pool.shutdown(); } } private void handleFailMessage(String message) { JSONObject jsonObject = new JSONObject(); jsonObject.put("message", message); FailMessageHandlerProducer.pushEvent(jsonObject); } private void monitorThreadPool() { try { //一级告警,线程池当前活动线程数大于阈值 if (pool.getActiveCount() > 16) { //告警处理 } //二级告警,线程池阻塞队列当前对象数大于阈值 if (pool.getQueue().size() > 100) { //告警处理 } } catch (Exception e) { log.error("monitorThreadPool alarm error, cause : {}", e); } } }失败消息处理:
public class FailMessageHandlerProducer extends Thread { private static LinkedBlockingQueue<JSONObject> queue = new LinkedBlockingQueue<>(10000); private static volatile boolean terminate = false; private long timeout = 10; public FailMessageHandlerProducer() { } public static synchronized void pushEvent(JSONObject republishData) { try { queue.put(republishData); } catch (Exception e) { log.error("FailMessageHandlerProducer push error", e); } } @Override public void run() { while (!terminate) { try { JSONObject republishData = queue.poll(timeout, TimeUnit.SECONDS); if (republishData != null) { //写入文件 } else { //停顿2s Thread.sleep(2 * 1000); } } catch (Exception e) { log.error("republish message error, cause : {}", e); } } } public static void setTerminate() { terminate = false; } }这里我采用了线程池进行异步化发送消息,当MQ挂掉或者推送消息特别慢的时候,线程池中的线程首先会进行积压,直到线程池最大线程数,在之后进入的线程会进入阻塞队列,当阻塞队列被打满后,线程池会抛出异常,捕获异常后将消息写入文件。关于线程池的机制可以看一下我的另一篇博文:Java ThreadPoolExecutor线程池概述
压力测试的工具,我使用是Jmeter,Jmeter使用,在PC环境下的压力测试数据如下,环境Intel 八代i5 4核 + 16G,
条件并发请求次数每秒吞吐使用线程池(初始化5,最大线程数20,阻塞队列1000)1000并发/1s2000220.8使用线程池(初始化5,最大线程数20,阻塞队列1000)1000并发/1s2000257.2使用线程池(初始化5,最大线程数20,阻塞队列1000)1000并发/1s2000217.5不使用线程池1000并发/1s200016.6使用线程池(初始化50,最大线程数100,阻塞队列1000)2000并发/1s10000181使用线程池(初始化30,最大线程数80,阻塞队列1000)2000并发/1s10000159不使用线程池2000并发/1s600016.8可以看到,在高并发场景下,没有线程池的场景,吞吐量差距非常巨大,但是也可以看到,线程池的线程数并不是越大越大的,需要根据服务器的配置情况,设定好合适的线程池配置。
本文结合我自己遇见的一次线上事故,采取的容灾方案,这个方案肯定不是很完美或者或者说设计的很好的,因为当MQ挂掉后,很多消息会写入文件,将这部分消息重新处理也是一个比较麻烦的事情,本文就是一个抛砖引玉的给出一个大概思路,如果你有更好的方案,欢迎留言我们讨论!
