专业的编程技术博客社区

网站首页 > 博客文章 正文

RocketMQ之Broker(四)(rocketmq的broker)

baijin 2024-08-15 17:03:48 博客文章 17 ℃ 0 评论

说完了commitLog文件的写入,接下来就说下consumeQueue和indexFile两个文件的写入。

在上一篇文章中的相关代码中没有看到相关consumeQueue和indexFile文件写入的代码,那么是在什么地方写入的呢?再说具体代码之前先说下这两个文件的组成部分

consumeQueue的组成

通过上篇文章我们知道所有的message都是存在commitLog文件中的。而消费者是根据发布订阅模式进行消费,根据topic进行筛选对自己可用的数据,如果从commitLog文件中筛选的话,那消息肯定是相当的慢,所以就有了consumeQueue。

从上图可以看到consumeQueue目录的组成,首先是topic名称,在topic名称下还有队列,在队列下是具体的文件。如果从这个文件中查找对应的消息索引然后再去commitLog文件中获取message效率就会快很多。

在consumeQueue文件中每条数据的组成信息如下:

8字节的消息偏移量+4字节的长度+8字节的tagHashCode


单个consumeQueue文件默认包含30w个条目,每个条目由固定的20字节组成。那么每个consumeQueue文件的大小就是30w*20字节。可以将这个文件看成是由条目组成的数组,下标是consumeQueue的逻辑偏移量。

indexFile的组成

除了上面构建consumeQueue当做commitLog的索引文件,还有一种就是构建hash索引文件IndexFile


上图是indexFile索引文件,从图中可以看出由三部分组成:indexHead、hash槽、index条目。

  • indexHead:由40个字节组成:8字节的消息最小存储时间+8字节的消息最大存储时间+8字节的开始物理偏移量+8字节的结束物理偏移量+4字节的hash槽数目+4字节的index条目数量
  • hash槽:一个indexFile文件默认包含500w个hash槽,每个hash槽存储的是落在该hash槽的hashCode最新的index条目的索引
  • index条目:一个indexFile文件默认包含2000w个index条目,每个index条目有20个字节组成:4字节的hashCode+8字节的消息物理便宜量+4字节的该消息存储时间和第一条消息存储时间差+4字节的前一个index索引(这个是用来标识hash冲突的)

上面所说的hash冲突是如何产生的呢?

比如不同不同key的hashCode一致或者不同key算出来的不同hashCode值对hash槽取余得到的值一致,这时候就出现了hash冲突,IndexFile是如何解决的呢?这时候preIndexNo就起到作用了。

从上面我们知道hash槽中存储的是该hashCode所对应的最新的Index条目下标,新的Index条目的最后4个字节存储的是该HashCode上一个条目的Index条目下标。这样的话是不是就解决了!如果hash槽中的值为0或者大于IndexFile最大条目或者小于-1,说明该hash槽中没有对应的Index条目。

通过上面的索引构建就可以快速定位到commitLog对应的消息。为什么?

我们在producer发送消息的时候会设置全局唯一的msgId。在消息发送的代码中:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

设置唯一id,在setUniqID方法中:

public static void setUniqID(final Message msg) {
    if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
        msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
    }
}

将生成的这个唯一id放到了msg的property中。后续会在IndexFile#putKey方法中看到这个

consumeQueue和IndexFile文件的写入

说完了这两个文件的组成,回到文章开头的问题:在什么地方将数据写入这两个地方的呢?

在broker的第一篇文章中说到broker的启动分为三部分,其中在start部分中有这么一段代码:

if (this.getMessageStoreConfig().isDuplicationEnable()) {
    this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
    this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();

其中reputMessageService就是将数据写入consumeQueue和IndexFile中的。

这个if是判断从哪个位置开始提取消息写入这两个文件中。isDuplicationEnable:是否允许重复转发。如果允许重复转发,将reputFromOffset设置为commitLog的提交指针,否则将reputFromOffset设置为最大偏移量

ReputMessageService继承了ServiceThread,ServiceThread实现了Runnable。所以ReputMessageService是一个异步线程,那么this.reputMessageService.start();就要看这个类的run方法:

public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

这个方法很简单,每睡眠1毫秒就调用一次reput方法。这个方法代码比较多,不一次性贴出来了,一点点说:

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

这个很简单:从commitLog文件中取出数据,位置就是上面说的reputFromOffset。得到的数据放在SelectMappedBufferResult中

DispatchRequest dispatchRequest =
    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

如果存在数据,构建DispatchRequest对象,这个对象就是后续consumeQueue和IndexFile文件数据的基础:比如topic、queueId、keys、uniqKey等等

DefaultMessageStore.this.doDispatch(dispatchRequest);

转发请求,分为两个:CommitLogDispatcherBuildConsumeQueue 和 CommitLogDispatcherBuildIndex

  • CommitLogDispatcherBuildConsumeQueue
public void dispatch(DispatchRequest request) {
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    switch (tranType) {
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            DefaultMessageStore.this.putMessagePositionInfo(request);
            break;
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
    }
}

构建consumeQueue,将数据写入consumeQueue:

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

findConsumeQueue方法根据topic和queueId查找到对应的consumeQueue文件;将数据写入consumeQueue。

将数据写入consumeQueue文件:

1)首先判断consumeQueue文件是否可写入

2)将commitLog的偏移量+msgSize+tagHashCode写入consumeQueue

3)更新checkPoint文件

  • CommitLogDispatcherBuildIndex

构建IndexFile文件,思路和consumeQueue文件大体相同,只不过组成不同

public void dispatch(DispatchRequest request) {
    if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
        DefaultMessageStore.this.indexService.buildIndex(request);
    }
}

indexService.buildIndex(request);

public void buildIndex(DispatchRequest req) {
    // 1. 获取或创建当前写入的IndexFile
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        // 2. 如果 indexFile 的最大偏移量大于该消息的 commitLog offset,忽略本次构建
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }

        /*
            下面3和4步骤中的uniq_key 和 keys
            这两个存放在消息的propertisMap中
            keys:用户在发送消息的时候可以指定,多个key用","分割
            uniqKey:消息唯一键,消息id并不是唯一的,因为如果消息消费重试时,发送的消息id和原来是一致的
         */

        // 3. 将uniqKeys 写入 indexFile
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }

        // 4. 将keys 写入 indexFile
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

看上面代码的第三步就出现了uniqKey,这个key就是producer发送消息生成的唯一key

以上就是consumeQueue和IndexFile文件的同步。下一篇文章将broker的其他一些小点整理一下然后就进入consumer的篇章了。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表