脉络
Rocketmq消息存储涉及几个比较重要的文件,我们先来看看这几个文件
CommitLog:存储消息的元数据,每个CommitLog文件的大小默认为1G。文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1073741824,当第一个文件写满了,会创建第二个文件,名为00000000001073741824,起始偏移量为1073741824,以此类推
ConsumerQueue:消费队列,主要用于消费拉取消息、更新消费位点等所用的索引
Index:索引文件,提供了一种可以通过key或时间区间来查询消息的方法
abort:这个设计本人觉得比较巧妙,如果这个文件存在,表示Rocketmq非正常关闭,如果这个文件不存在,表示Rocketmq正常关闭
在Rocketmq中,CommitLog、ConsumerQueue、Index这些文件都被映射成存储对象MappedFile,消息到来时会先存储在CommitLog,ConsumeQueue 和 Index文件是通过ReputMessageService的异步线程根据CommitLog的数据对其进行更新
文件创建
当有一条消息过来,Broker首先要思考的是:这条消息应该存在哪个文件中,上文说到,这些文件叫做CommitLog,默认大小为1G,那么这些文件是什么时候创建的?Rockermq有一个文件预分配机制,当上一个CommitLog写满时,自然要获取下一个CommitLog,文件预分配机制使得获取下一个文件时,不需要等待文件创建,下面我们来看看Rocketmq的文件预分配机制是怎么做的
我们先看一个比较关键的类:AllocateMappedFileService,Broker启动的时候会初始化DefaultMessageStore,DefaultMessageStore初始化的时候会初始化AllocateMappedFileService,根据下图,可以看到,AllocateMappedFileService继承了ServiceThread,ServiceThread实现了Runnable接口,我们直接看这个类的run方法
1 | kotlin复制代码public void run() { |
可以看到,mmapOperation不断循环就做了两件事
- 初始化MappedFile
- 预热MappedFile
这里有一个问题,为什么要进行文件预热?
要知道这个问题的答案,需要先了解一下Page Cache
Page Cache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 Page Cache 机制对读写访问操作进行了性能优化,将一部分的内存用作 Page Cache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 Page Cache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。文件预热可以防止出现缺页中断,从磁盘重新加载数据到内存
以上这段解释来自 segmentfault.com/a/119000004…
创建的命令是来自优先队列,如果优先队列中没有AllocateRequest,会一直阻塞,那么AllocateRequest是从哪里放进去的
在将消息存储到CommitLog前,需要知道这些消息要存储到哪个CommitLog,所以会有一个获取写入文件的操作,如果此文件不存在,或者上一个文件已写满,便会创建MappedFile,AllocateRequest便是在此时放进优先队列中的
1 | kotlin复制代码public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { |
提交请求的具体代码在AllocateMappedFileService类中
1 | kotlin复制代码public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { |
消息写入文件
1 | kotlin复制代码public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, |
doAppend是具体的写入文件逻辑,这个方法处理数据的方式比较复杂,本人水平有限,也没看懂,我们只看一个比较重要的逻辑
1 | kotlin复制代码public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, |
未完待续
参考资料
cloud.tencent.com/developer/a…
本文转载自: 掘金