浅谈Rocketmq源码-消息存储(一)

脉络

image.png

Rocketmq消息存储涉及几个比较重要的文件,我们先来看看这几个文件

CommitLog:存储消息的元数据,每个CommitLog文件的大小默认为1G。文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1073741824,当第一个文件写满了,会创建第二个文件,名为00000000001073741824,起始偏移量为1073741824,以此类推

ConsumerQueue:消费队列,主要用于消费拉取消息、更新消费位点等所用的索引

Index:索引文件,提供了一种可以通过key或时间区间来查询消息的方法

abort:这个设计本人觉得比较巧妙,如果这个文件存在,表示Rocketmq非正常关闭,如果这个文件不存在,表示Rocketmq正常关闭

在Rocketmq中,CommitLog、ConsumerQueue、Index这些文件都被映射成存储对象MappedFile,消息到来时会先存储在CommitLog,ConsumeQueue 和 Index文件是通过ReputMessageService的异步线程根据CommitLog的数据对其进行更新

文件创建

当有一条消息过来,Broker首先要思考的是:这条消息应该存在哪个文件中,上文说到,这些文件叫做CommitLog,默认大小为1G,那么这些文件是什么时候创建的?Rockermq有一个文件预分配机制,当上一个CommitLog写满时,自然要获取下一个CommitLog,文件预分配机制使得获取下一个文件时,不需要等待文件创建,下面我们来看看Rocketmq的文件预分配机制是怎么做的

我们先看一个比较关键的类:AllocateMappedFileService,Broker启动的时候会初始化DefaultMessageStore,DefaultMessageStore初始化的时候会初始化AllocateMappedFileService,根据下图,可以看到,AllocateMappedFileService继承了ServiceThread,ServiceThread实现了Runnable接口,我们直接看这个类的run方法

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
kotlin复制代码public void run() {
   log.info(this.getServiceName() + " service started");
// while循环,服务不停止就一直调用mmapOperation这个方法
   while (!this.isStopped() && this.mmapOperation()) {

  }
   log.info(this.getServiceName() + " service end");
}

// 此线程不断循环调用的mmapOperation究竟做了什么操作
private boolean mmapOperation() {
   boolean isSuccess = false;
   AllocateRequest req = null;
   try {
     // 从优先队列中获取AllocateRequest
     // 如果requestQueue为空,会阻塞等待唤醒
     req = this.requestQueue.take();
     // 从ConcurrentMap requestTable中获取AllocateRequest
     AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
     // 校验
     // requestQueue和requestTable中的数据需一致
     if (null == expectedRequest) {
       log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
       return true;
    }
     if (expectedRequest != req) {
       log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
       return true;
    }
// 如果MappedFile为空,表示要创建MappedFile
     if (req.getMappedFile() == null) {
       // 记录开始创建MappedFile的时间
       long beginTime = System.currentTimeMillis();

       MappedFile mappedFile;
       // 判断是否开启isTransientStorePoolEnable,如果开启则使用直接内存写入数据,这个判断有三个条件
       // 1.开启transientStorePoolEnable配置
    // 2.异步输盘
    // 3.必须是Broker主节点
       // transientStorePoolEnable = true 时,mappedByteBuffer 只是用来读消息,堆外内存用来写消息,从而实现对于消息的读写分离
       if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
         try {
           mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
           mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        } catch (RuntimeException e) {
           log.warn("Use default implementation.");
           mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        }
      } else {
         // 使用 mmap 方式创建MappedFile
         mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
      }
// 计算创建MappedFile所用的时间
       long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
       // 如果超过10ms则打印告警日志
       if (elapsedTime > 10) {
         int queueSize = this.requestQueue.size();
         log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                  + " " + req.getFilePath() + " " + req.getFileSize());
      }

       // 满足这两个条件会进行文件预热
       // 1.配置了进行文件预热
       // 2.只有 CommitLog 才进行文件预热,所以MappedFile的文件大小需大于CommitLog的文件大小,CommitLog 的大小默认为1G
       // 文件预热会进行数据预写入,根据系统的 pageSize 对每个 pageSize 写入一个字节数据。
       if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
          .getMappedFileSizeCommitLog()
           &&
           this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
         mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                                   this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
      }

       req.setMappedFile(mappedFile);
       this.hasException = false;
       isSuccess = true;
    }
  }
......
  } finally {
     if (req != null && isSuccess)
       req.getCountDownLatch().countDown();
  }
   return true;
}

可以看到,mmapOperation不断循环就做了两件事

  1. 初始化MappedFile
  2. 预热MappedFile

这里有一个问题,为什么要进行文件预热?

要知道这个问题的答案,需要先了解一下Page Cache
Page Cache 是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 Page Cache 机制对读写访问操作进行了性能优化,将一部分的内存用作 Page Cache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 Page Cache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。文件预热可以防止出现缺页中断,从磁盘重新加载数据到内存

以上这段解释来自 segmentfault.com/a/119000004…

创建的命令是来自优先队列,如果优先队列中没有AllocateRequest,会一直阻塞,那么AllocateRequest是从哪里放进去的

在将消息存储到CommitLog前,需要知道这些消息要存储到哪个CommitLog,所以会有一个获取写入文件的操作,如果此文件不存在,或者上一个文件已写满,便会创建MappedFile,AllocateRequest便是在此时放进优先队列中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
kotlin复制代码public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
   long createOffset = -1;
// 获取将要写入的CommitLog对应的MappedFile
   MappedFile mappedFileLast = getLastMappedFile();
// 如果MappedFile为空,表示要进行创建
   if (mappedFileLast == null) {
     // 计算出新文件的起始偏移量(起始偏移量即文件名称)
     createOffset = startOffset - (startOffset % this.mappedFileSize);
  }
// 如果MappedFile写满了,同样要计算新文件的起始偏移量
   if (mappedFileLast != null && mappedFileLast.isFull()) {
     createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
  }

   if (createOffset != -1 && needCreate) {
     // 拼接文件名称
     String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
     String nextNextFilePath = this.storePath + File.separator
       + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
     MappedFile mappedFile = null;
// allocateMappedFileService 已初始化,创建下一个文件和下下个文件
     if (this.allocateMappedFileService != null) {
       mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                                                                                 nextNextFilePath, this.mappedFileSize);
    } else {
       // allocateMappedFileService 未初始化,直接创建文件
       try {
         mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
      } catch (IOException e) {
         log.error("create mappedFile exception", e);
      }
    }
// 将创建的 MappedFile 对象添加到 mappedFiles 列表中
     if (mappedFile != null) {
       if (this.mappedFiles.isEmpty()) {
         mappedFile.setFirstCreateInQueue(true);
      }
       this.mappedFiles.add(mappedFile);
    }
// 返回新创建的MappedFile
     return mappedFile;
  }

   return mappedFileLast;
}

提交请求的具体代码在AllocateMappedFileService类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
kotlin复制代码public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默认提交两个请求
   int canSubmitRequests = 2;
   if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
     if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
         && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
       canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
    }
  }

   AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 向ConcurrentHashMap中存放AllocateRequest对象,如果存放失败说明有别的线程已经创建相同的文件
   boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
// 存放成功,向requestQueue存放AllocateRequest对象
   if (nextPutOK) {
     // 异常处理
     if (canSubmitRequests <= 0) {
       log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
       this.requestTable.remove(nextFilePath);
       return null;
    }
     boolean offerOK = this.requestQueue.offer(nextReq);
     if (!offerOK) {
       log.warn("never expected here, add a request to preallocate queue failed");
    }
     canSubmitRequests--;
  }
// 相同的逻辑,上面是创建下一个文件,此处是创建下下个文件
   AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
   boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
   if (nextNextPutOK) {
     if (canSubmitRequests <= 0) {
       log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
       this.requestTable.remove(nextNextFilePath);
    } else {
       boolean offerOK = this.requestQueue.offer(nextNextReq);
       if (!offerOK) {
         log.warn("never expected here, add a request to preallocate queue failed");
      }
    }
  }

  ......

   AllocateRequest result = this.requestTable.get(nextFilePath);
   try {
     if (result != null) {
       // 等待下一个MappedFile文件创建完成
       boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
       if (!waitOK) {
         log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
         return null;
      } else {
         // 删除requestTable中对应的数据
         this.requestTable.remove(nextFilePath);
         return result.getMappedFile();
      }
    } else {
       log.error("find preallocate mmap failed, this never happen");
    }
  } catch (InterruptedException e) {
     log.warn(this.getServiceName() + " service has exception. ", e);
  }

   return null;
}

消息写入文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
kotlin复制代码public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
           PutMessageContext putMessageContext) {
   assert messageExt != null;
   assert cb != null;
   // 获取消息的写指针
   int currentPos = this.wrotePosition.get();
   // 当前写指针小于文件的大小,那就文件还没写满
   if (currentPos < this.fileSize) {
     // writeBuffer不为空,则取writeBuffer
     // 什么时候writeBuffer不为空呢?
     // 判断的根据为transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
     //       && BrokerRole.SLAVE != getBrokerRole();
     // 1.开启transientStorePoolEnable配置
     // 2.异步输盘
     // 3.必须是Broker主节点
     ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
     byteBuffer.position(currentPos);
     AppendMessageResult result;
     // 处理单个消息
     if (messageExt instanceof MessageExtBrokerInner) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBrokerInner) messageExt, putMessageContext);
       // 处理批量消息
    } else if (messageExt instanceof MessageExtBatch) {
       result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                            (MessageExtBatch) messageExt, putMessageContext);
    } else {
       return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
     // 更新wrotePosition的位置
     this.wrotePosition.addAndGet(result.getWroteBytes());
     this.storeTimestamp = result.getStoreTimestamp();
     return result;
  }
   log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
   return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

doAppend是具体的写入文件逻辑,这个方法处理数据的方式比较复杂,本人水平有限,也没看懂,我们只看一个比较重要的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
kotlin复制代码public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
           final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
           

  ......

   // 确认是否有足够的空闲空间
// 如果超过,返回END_OF_FILE,在此方法的外面会处理这种场景
// 处理方式是创建一个新的文件存储消息
   if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
     this.msgStoreItemMemory.clear();
     this.msgStoreItemMemory.putInt(maxBlank);
     // BLANK_MAGIC_CODE表示一个CommitLog文件结尾魔法值,当设置成这个魔法值表示文件已写完
     this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
     final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
     byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
     return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                                    maxBlank,
                                    msgIdSupplier, msgInner.getStoreTimestamp(),
                                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
  }

  ......
   return result;
}

未完待续

参考资料

segmentfault.com/a/119000004…

cloud.tencent.com/developer/a…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%