介绍完了broker的启动和broker处理producer的消息,接下来就会说说当broker组装完producer发送过来的消息是如何进行存储的。
在SendMessageProcessor#sendMessage方法中:putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
该方法最终会进入:
org.apache.rocketmq.store.DefaultMessageStore#putMessage方法中
在该方法中会进行一系列的校验,当校验通过之后会调用方法:PutMessageResult result = this.commitLog.putMessage(msg);
commitLog.putMessage方法很长,就不贴出来了,说一下这个方法的主要步骤:
1、 获取sysFlag字段
2、根据获取的sysFlag字段判断是否是非事务消息或者是提交事务类型的消息
3、获取mappedFile文件
4、加锁
5、对第三步获取的mappedFile文件进行判断
6、将消息添加到mappedFile中
7、释放锁
8、主从同步
9、高可用主从复制
①和②主要是判断消息的延迟级别
// 1. 获取sysFlag字段,判断是否是非事务性或者提交事务消息
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 2. 如果消息类型不是事务消息或者是commitLog
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID
// 更新原先消息的主题与队列
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
如果消息的延迟级别大于0,将消息的原主题名称和原消息队列ID存入消息属性中,用延迟主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列。这个主要是和消息的重试有关,消息重试和consumer有关,会放在consumer中说
③:获取mappedFile文件
从mappedFileQueue中获取最后一个mappedFile文件
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
④:在写入mappedFile中之前进行加锁,说明将消息写入mappedFile是串行的
⑤:对第三步获取到的mappedFile文件进行判断
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
⑥:将消息添加到mappedFile中
其实就是写入mappedFile中的mappedByteBuffer中,由后台服务线程定时写入到磁盘中
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 1. 获取当前写指针
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
// 通过slice()方法创建一个与mappedFile的共享内存区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 设置position为当前指针
byteBuffer.position(currentPos);
AppendMessageResult result = null;
// 2. 判断消息类型是单个消息还是批量消息
if (messageExt instanceof MessageExtBrokerInner) {
// 3. 消息写入实现
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
// 批量
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
这部分都是NIO的知识,关于这部分后续会整理出来。从上面的方法可以找出核心方法是doAppend。在doAppend中主要是做了commitLog文件中消息的组成。
这里拐个弯说下commitLog文件中每条的消息的组成,再去看doAppend代码就简单很多了。
一个commitLog文件默认大小是1G,该文件是由一条条的消息组成,每条消息的组成部分如下:
消息总长度(4) + 魔数(4) + 消息队列id(4) + 标记位(4) + 消息队列偏移量(8) + 物理偏移量(8) + 系统标记(4) + born存储时间戳(8) +Broker地址,包含端口(8)+ 存储时间戳(8) + 存储地址,包含端口(8) + 消息消费重试次数(4) + 4字节body长度和具体的消息内容(4) + 1字节topic长度与topic内容(1) + 2字节消息属性长度和具体的扩展属性(2)
- TotalSize:表示的是该条消息的总长度,占4个字节
- magicCode:魔数,这个字段标志着该commitLog是正常的还是空的。在后面介绍broker启动时文件的恢复(这部分在broker启动的时候没有说)时会说到这个
- bodyCRC:CRC是循环冗余校验码
- queueId:队列id,即消息发往哪个队列,这个是producer发送消息的时根据topic从NameSrv获取到的topic路由,路由中存在队列id
- Flag:默认是0
- QueueOffset:这个是consumeQueue文件中的偏移量,后续说到consumeQueue文件的写入时会说到
- physicalOffset:物理位置,即该消息在在commitLog中的物理位置(需要注意的是,commitLog对应着磁盘上多个文件,这里的偏移量不是从某个文件开始算的,而是从第一个文件开始算起的,后面代码会看到)
- sysFlag:之前提到过几次,比如在producer发送的消息的时候有个判断是消息是否需要压缩。SysFlag是RocketMQ内部使用的标记位,通过位运算进行标记。例如是否是事务消息,初始值是0
- bornTimestamp:这个是producer发送消息的时间
- BornHost:broker地址+port
- storeTimestamp:message在broker端上存储的时间
- storeHostAddress:存储地址
- ReconsumeTimes:重复消费次数,初始为0。consumer重试的时候,这个ReconsumeTimes就会加1,默认最大重试次数是16。
- preparedTransactionOffset:事务消息相关的一个属性,RocketMQ事务消息基于两阶段提交
- topic:消息主题
- properties:消息属性,例如我们在发送消息的时候TAG就会放在properties中,还有messageId等等
说完了组成部分,接下来的代码就会围绕这些属性展开的,使用producer发送过来的所有信息组成上面的部分然后写进ByteBuf中!
在doAppend方法中:
1)long wroteOffset = fileFromOffset + byteBuffer.position();
这个是计算该条消息在commitLog文件中的物理偏移量
2)String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
计算msgId,这个msgId和之前说的全局msgId不是一回事!这个msgId是offsetMsgId,这个是消息偏移id,该id记录了消息所在集群的物理地址。是由broker的IP(4字节)+port(4字节)+commitLog文件的物理偏移量(8字节)。一条消息可能存在多个offsetMsgId(如果消息重试的话,就会重新生成一个offsetMsgId),但只会存在一个msgId
3)queueOffset的组成
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
// 2. 根据topic-queueId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量位0.
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
4)final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
这个在上面介绍commitLog中一条消息的组成时说过了
以下是代码:
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
// 将消息存储到byteBuffer中,并没有刷写到磁盘中
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
⑦:释放锁
⑧:刷盘操作(同步 or 异步)
⑨:高可用主从复制
第八和第九点放在后面整理,下篇文章会说下consumeQueue和indexFile两个文件的写入逻辑。
本文暂时没有评论,来添加一个吧(●'◡'●)