专业的编程技术博客社区

网站首页 > 博客文章 正文

RocketMQ之Broker(三)(rocketmq broker作用)

baijin 2024-08-15 17:04:13 博客文章 17 ℃ 0 评论

介绍完了broker的启动和broker处理producer的消息,接下来就会说说当broker组装完producer发送过来的消息是如何进行存储的。

在SendMessageProcessor#sendMessage方法中:putMessageResult =

this.brokerController.getMessageStore().putMessage(msgInner);

该方法最终会进入:

org.apache.rocketmq.store.DefaultMessageStore#putMessage方法中

在该方法中会进行一系列的校验,当校验通过之后会调用方法:PutMessageResult result = this.commitLog.putMessage(msg);

commitLog.putMessage方法很长,就不贴出来了,说一下这个方法的主要步骤:

1、 获取sysFlag字段

2、根据获取的sysFlag字段判断是否是非事务消息或者是提交事务类型的消息

3、获取mappedFile文件

4、加锁

5、对第三步获取的mappedFile文件进行判断

6、将消息添加到mappedFile中

7、释放锁

8、主从同步

9、高可用主从复制

①和②主要是判断消息的延迟级别

// 1. 获取sysFlag字段,判断是否是非事务性或者提交事务消息
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 2. 如果消息类型不是事务消息或者是commitLog

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    // 如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID
    // 更新原先消息的主题与队列
    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }

        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

如果消息的延迟级别大于0,将消息的原主题名称和原消息队列ID存入消息属性中,用延迟主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列。这个主要是和消息的重试有关,消息重试和consumer有关,会放在consumer中说

③:获取mappedFile文件

从mappedFileQueue中获取最后一个mappedFile文件

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;

    while (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

④:在写入mappedFile中之前进行加锁,说明将消息写入mappedFile是串行的

⑤:对第三步获取到的mappedFile文件进行判断

if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    beginTimeInLock = 0;
    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

⑥:将消息添加到mappedFile中

其实就是写入mappedFile中的mappedByteBuffer中,由后台服务线程定时写入到磁盘中

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    // 1. 获取当前写指针
    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        // 通过slice()方法创建一个与mappedFile的共享内存区
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();

        // 设置position为当前指针
        byteBuffer.position(currentPos);
        AppendMessageResult result = null;
        // 2. 判断消息类型是单个消息还是批量消息
        if (messageExt instanceof MessageExtBrokerInner) {
            // 3. 消息写入实现
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            // 批量
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }

        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

这部分都是NIO的知识,关于这部分后续会整理出来。从上面的方法可以找出核心方法是doAppend。在doAppend中主要是做了commitLog文件中消息的组成。

这里拐个弯说下commitLog文件中每条的消息的组成,再去看doAppend代码就简单很多了。

一个commitLog文件默认大小是1G,该文件是由一条条的消息组成,每条消息的组成部分如下:


消息总长度(4) + 魔数(4) + 消息队列id(4) + 标记位(4) + 消息队列偏移量(8) + 物理偏移量(8) + 系统标记(4) + born存储时间戳(8) +Broker地址,包含端口(8)+ 存储时间戳(8) + 存储地址,包含端口(8) + 消息消费重试次数(4) + 4字节body长度和具体的消息内容(4) + 1字节topic长度与topic内容(1) + 2字节消息属性长度和具体的扩展属性(2)

  • TotalSize:表示的是该条消息的总长度,占4个字节
  • magicCode:魔数,这个字段标志着该commitLog是正常的还是空的。在后面介绍broker启动时文件的恢复(这部分在broker启动的时候没有说)时会说到这个
  • bodyCRC:CRC是循环冗余校验码
  • queueId:队列id,即消息发往哪个队列,这个是producer发送消息的时根据topic从NameSrv获取到的topic路由,路由中存在队列id
  • Flag:默认是0
  • QueueOffset:这个是consumeQueue文件中的偏移量,后续说到consumeQueue文件的写入时会说到
  • physicalOffset:物理位置,即该消息在在commitLog中的物理位置(需要注意的是,commitLog对应着磁盘上多个文件,这里的偏移量不是从某个文件开始算的,而是从第一个文件开始算起的,后面代码会看到
  • sysFlag:之前提到过几次,比如在producer发送的消息的时候有个判断是消息是否需要压缩。SysFlag是RocketMQ内部使用的标记位,通过位运算进行标记。例如是否是事务消息,初始值是0
  • bornTimestamp:这个是producer发送消息的时间
  • BornHost:broker地址+port
  • storeTimestamp:message在broker端上存储的时间
  • storeHostAddress:存储地址
  • ReconsumeTimes:重复消费次数,初始为0。consumer重试的时候,这个ReconsumeTimes就会加1,默认最大重试次数是16。
  • preparedTransactionOffset:事务消息相关的一个属性,RocketMQ事务消息基于两阶段提交
  • topic:消息主题
  • properties:消息属性,例如我们在发送消息的时候TAG就会放在properties中,还有messageId等等

说完了组成部分,接下来的代码就会围绕这些属性展开的,使用producer发送过来的所有信息组成上面的部分然后写进ByteBuf中!

在doAppend方法中:

1)long wroteOffset = fileFromOffset + byteBuffer.position();

这个是计算该条消息在commitLog文件中的物理偏移量

2)String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

计算msgId,这个msgId和之前说的全局msgId不是一回事!这个msgId是offsetMsgId,这个是消息偏移id,该id记录了消息所在集群的物理地址。是由broker的IP(4字节)+port(4字节)+commitLog文件的物理偏移量(8字节)。一条消息可能存在多个offsetMsgId(如果消息重试的话,就会重新生成一个offsetMsgId),但只会存在一个msgId

3)queueOffset的组成

keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
// 2. 根据topic-queueId 获取该队列的偏移地址(待写入的地址),如果没有,新增一个键值对,当前偏移量位0.
Long queueOffset = CommitLog.this.topicQueueTable.get(key);

4)final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

这个在上面介绍commitLog中一条消息的组成时说过了

以下是代码:

// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
    this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
    this.msgStoreItemMemory.put(propertiesData);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
// 将消息存储到byteBuffer中,并没有刷写到磁盘中
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

⑦:释放锁

⑧:刷盘操作(同步 or 异步)

⑨:高可用主从复制

第八和第九点放在后面整理,下篇文章会说下consumeQueue和indexFile两个文件的写入逻辑。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表