有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。
Consumer消费拉取的消息的方式有两种
1. Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可 2. Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现下面介绍默认以push方式为主,因为绝大多数是由push消费方式来使用rocketmq的。
消费端负载均衡
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载
消费端遍历自己的所有topic,依次调rebalanceByTopic 根据topic获取此topic下的所有queue 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息) 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法,获取队列集合Set<MessageQueue>mqSet 1) 平均分配算法,其实是类似于分页的算法 将所有queue排好序类似于记录 将所有消费端consumer排好序,相当于页数 然后获取当前consumer所在页面应该分配到的queue 2) 按照配置来分配队列, 也就是说在consumer启动的时候指定了queue 3) 按照机房来配置队列 Consumer启动的时候会指定在哪些机房的消息 获取指定机房的queue 然后在执行如1)平均算法 根据分配队列的结果更新ProccessQueueTable<MessageQueue,ProcessQueue> 1) 比对mqSet 将多余的队列删除,当broker当机或者添加,会导致分配到mqSet变化, a) 将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是设置ProcessQueue的droped属性为true b) 将超过两份中没有拉取动作ProcessQueue删除 //TODO 为什么要删除掉,两分钟后来了消息怎么办? // 2) 添加新增队列,比对mqSet,给新增的messagequeue 构建长轮询对象PullRequest对象,会从broker获取消费的进度 构建这个队列的ProcessQueue 将PullRequest对象派发到长轮询拉消息服务(单线程异步拉取) 注:ProcessQueue正在被消费的队列, (1) 长轮询拉取到消息都会先存储到ProcessQueue的TreeMap<Long, MessageExt>集合中,消费调后会删除掉,用来控制consumer消息堆积, TreeMap<Long, MessageExt> key是消息在此ConsumeQueue队列中索引 (2) 对于顺序消息消费处理 locked属性:当consumer端向broker申请锁队列成功后设置true,只有被锁定的processqueue才能被执行消费 rollback: 将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费 commit: 将临时表msgTreeMapTemp数据清空,代表消费完成,放回最大偏移值 (3) 这里是个TreeMap,对key即消息的offset进行排序,这个样可以使得消息进行顺序消费
Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息
DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作 1. 获取ProcessQueue判读是否drop的, drop为true返回 2. 给ProcessQueue设置拉消息时间戳 3. 流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息 4. 流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费 5. 根据topic获取订阅关系 6. 构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调 7. 从内存中获取commitOffsetValue //TODO 这个值跟pullRequest.getNextOffset区别 8. 构建sysFlag pull接口用到的flag 9. 调底层通信层向broker发送拉消息请求 如果master压力过大,会建议去slave拉取消息 如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交 //TODO 关于master拉消息实时提交指的是什么? 10. 拉到消息后回调PullCallback 处理broker返回结果pullResult 更新从哪个broker(master 还是slave)拉取消息 反序列化消息 消息过滤 消息中放入队列最大最小offset,方便应用来感知消息堆积度 将消息加入正在处理队列ProcessQueue 将消息提交到消费消息服务ConsumeMessageService 流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息, 如果pullInterval为0立刻在执行拉消息动作序列图
1. 向broker发送长轮询请求
2. Broker接收长轮询请求
3. Consumer接收broker响应
长轮询活动图:
一张图画不下,再来一张
通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,
ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。
长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize= 32,可配置
消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBatchMaxSize = 1, 默认批量个数为一个
ConsumeRequest 任务run方法执行
判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息 构建并行消费上下文 给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到topic为%RETRY%groupname的队列中 调MessageListenerConcurrently监听器的consumeMessage方法消费消息,返回消费结果 如果ProcessQueue的droped为true,不处理结果,不更新offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险 处理消费结果 消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,但是消费消息的时候一旦遇到消费消息失败直接放回,根据ackIndex来标记成功消费到哪里了 消费失败, ackIndex设置为-1 广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理 集群模式, 将消费失败的消息一条条的发送到broker的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在cosumer的本地线程定时5秒钟以后重试重新消费消息,在走一次上面的消费流程。 删除正在消费的队列processQueue中本次消费的消息,放回消费进度 更新消费进度,这里只是一个内存offsettable的更新,后面有定时任务更新到broker上去