RocketMQ是阿里开源的一款高性能高吞吐的消息中间件,我们来研究下它是如何实现的,重点关注索引。
我们拿一个执行用例来测试,代码如下:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.rocketmq.example.quickstart; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { SimpleDateFormat time=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); final DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("localhost:9876"); producer.start(); final int num = 2; for (int i = 0; i < 1; i++) { try { Message msg = new Message("Topic1" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + num + time.format(new Date()) + " " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); msg.putUserProperty("psly", "psly"); producer.send(msg, new MessageQueueSelector(){ @Override public MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg){ System.out.println(arg); return mqs.get(((Integer) arg) % mqs.size()); } }, num); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
我们接着在DefaultMessageStore里面打个断点,然后执行以上用例。
可以看到代码进入了这个方法。
我们跟着它的执行,最后会看到它到了关键的doAppend方法。
这个位置会真正开始组织消息数据,并且保存到commit文件对应的内存映射里面。
那么具体来说,消息数据是如何格式化的呢?我们可以直接看calMsgLength方法,注释中详细说明了消息存储所占有的字节数:
我们可以重点关注其中的几个重要数据:
TOTALSIZE,作为消息的最字节数,作为第一个成员,4个字节数。它用于界定消息的边界。 BODYCRC,通过循环冗余校验来查看消息内容是否已出错。 QUEUEOFFSET,根据topic名称取得对应的Long(默认0-4),将来作为存储索引文件的目录。 PHYSICALOFFSET,用于消息在查找持久化(文件)之后在文件块(MappedFile)中的偏移位置。 然后消息格式化之后,接着又要干什么呢?一般来说此时内存里面已经有了这条消息,但是我们不知道消息何时会被消费,所以我们得持久化这条消息。也就是将将消息flush到文件上。而事实上我们已经构造的消息内存正是关联到一个文件的,截图如下: 那么我们所要做的就是去flush这个文件映射,从而确保消息保存到磁盘上。 另一方面,RocketMQ的索引设计采取的方式是 先格式化消息(计算此消息的总大小、topic名字、此key计算得到的queueoffset、放入磁盘中的偏移量等数据),然后放入消息块文件(比较大的文件,默认貌似1G)。一个线程异步地将上面构造的消息flush进硬盘一个线程将topic对应的physicaloffset放入索引的文件目录(内存映射)。physicaloffset用于从大块的文件存储中索引该消息。一个线程将上面构造的文件(含索引),刷新到硬盘中。 所以这里的消息将来怎么取得呢? 方式如下: 首先根据topic直接取得对应的topic目录。再根据key计算对应的queueoffset值,默认(0-4)。该目录下的文件内容(默认大小6000000个字节,5860KB)为消息对应commit大文件的索引,默认一个消息20(CQ_STORE_UNIT_SIZE)个字节。所以先取得 索引文件的内容(20个字节),然后根据其中的offset字段、总长度字段,去commit文件中取得真正的消息内容。(因为采用MappedByteBuffer来实现,所以以上的操作很可能不需要磁盘IO) 这里有个问题,为什么不为每个topic、queue建立一个文件来专门保存此类消息呢? 推测如下: 假如topic过多,会导致文件数量过多,且每个文件都保存着大量数据,不好维护。分成多个topic文件的方式,并不能提高IO的效率。可能会导致同时打开多个文件I/O。将消息内容都存在一个目录。这样读取和写入时只需要打开一个文件I/O,提高效率。然后将物理位置的索引放到对应的topic目录。这种方式可以理解为:一个重量级的目录+多个轻量级的索引目录。 最后我们来看看实现的代码: 从消息内容中提取字段,构造字段存入索引文件(仅在内存中构造ConsumeQueue,以及存入消息索引,很快),由ReputMessageService线程来完成。 由于需要快速响应给消费者,可以看到这里轮询的时间间隔非常短(Thread.sleep(1))。 将消息内容commit到磁盘上,由FlushRealTimeService线程来完成: 这里的interval稍微久点,默认500毫秒。因为刷一次硬盘比较昂贵,尽量一次多干点活。 将ReputMessageService产生的索引刷到对应的topic目录文件中,由FlushConsumeQueueService线程完成,代码如下: 由于前面的ReputMessageService线程已经将索引数据保存在内容中了,所以这里的磁盘操作轮询间隔interval也比较大,默认1000毫秒。 最后还有个重要的问题,这四类线程是如何协作工作的呢,看如图代码: SendMessageThread_*线程通过wrotePosition变量来通知ReputMessageService线程和FlushRealTimeService线程。FlushRealTimeService执行消息内容持久化,ReputMessageService执行构建消息索引的内存映射。这两者可同时进行。ReputMessageService完成任务之后,再次通过其对应的wrotePosition来通知FlushConsumeQueueService进行刷新索引的工作。代码如下: 所以这里的依赖如下: FlushRealTimeService 依赖于SendMessageThread_*,通过wrotePosition变量;ReputMessageService 依赖于SendMessageThread_*,通过wrotePosition变量; FlushConsumeQueueService 依赖于ReputMessageService,通过wrotePosition变量。 以上为索引与存储的服务设计。