说完了commitLog文件的写入,接下来就说下consumeQueue和indexFile两个文件的写入。
在上一篇文章中的相关代码中没有看到相关consumeQueue和indexFile文件写入的代码,那么是在什么地方写入的呢?再说具体代码之前先说下这两个文件的组成部分
consumeQueue的组成
通过上篇文章我们知道所有的message都是存在commitLog文件中的。而消费者是根据发布订阅模式进行消费,根据topic进行筛选对自己可用的数据,如果从commitLog文件中筛选的话,那消息肯定是相当的慢,所以就有了consumeQueue。
从上图可以看到consumeQueue目录的组成,首先是topic名称,在topic名称下还有队列,在队列下是具体的文件。如果从这个文件中查找对应的消息索引然后再去commitLog文件中获取message效率就会快很多。
在consumeQueue文件中每条数据的组成信息如下:
8字节的消息偏移量+4字节的长度+8字节的tagHashCode
单个consumeQueue文件默认包含30w个条目,每个条目由固定的20字节组成。那么每个consumeQueue文件的大小就是30w*20字节。可以将这个文件看成是由条目组成的数组,下标是consumeQueue的逻辑偏移量。
indexFile的组成
除了上面构建consumeQueue当做commitLog的索引文件,还有一种就是构建hash索引文件IndexFile
上图是indexFile索引文件,从图中可以看出由三部分组成:indexHead、hash槽、index条目。
- indexHead:由40个字节组成:8字节的消息最小存储时间+8字节的消息最大存储时间+8字节的开始物理偏移量+8字节的结束物理偏移量+4字节的hash槽数目+4字节的index条目数量
- hash槽:一个indexFile文件默认包含500w个hash槽,每个hash槽存储的是落在该hash槽的hashCode最新的index条目的索引
- index条目:一个indexFile文件默认包含2000w个index条目,每个index条目有20个字节组成:4字节的hashCode+8字节的消息物理便宜量+4字节的该消息存储时间和第一条消息存储时间差+4字节的前一个index索引(这个是用来标识hash冲突的)
上面所说的hash冲突是如何产生的呢?
比如不同不同key的hashCode一致或者不同key算出来的不同hashCode值对hash槽取余得到的值一致,这时候就出现了hash冲突,IndexFile是如何解决的呢?这时候preIndexNo就起到作用了。
从上面我们知道hash槽中存储的是该hashCode所对应的最新的Index条目下标,新的Index条目的最后4个字节存储的是该HashCode上一个条目的Index条目下标。这样的话是不是就解决了!如果hash槽中的值为0或者大于IndexFile最大条目或者小于-1,说明该hash槽中没有对应的Index条目。
通过上面的索引构建就可以快速定位到commitLog对应的消息。为什么?
我们在producer发送消息的时候会设置全局唯一的msgId。在消息发送的代码中:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
设置唯一id,在setUniqID方法中:
public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}
将生成的这个唯一id放到了msg的property中。后续会在IndexFile#putKey方法中看到这个
consumeQueue和IndexFile文件的写入
说完了这两个文件的组成,回到文章开头的问题:在什么地方将数据写入这两个地方的呢?
在broker的第一篇文章中说到broker的启动分为三部分,其中在start部分中有这么一段代码:
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
其中reputMessageService就是将数据写入consumeQueue和IndexFile中的。
这个if是判断从哪个位置开始提取消息写入这两个文件中。isDuplicationEnable:是否允许重复转发。如果允许重复转发,将reputFromOffset设置为commitLog的提交指针,否则将reputFromOffset设置为最大偏移量
ReputMessageService继承了ServiceThread,ServiceThread实现了Runnable。所以ReputMessageService是一个异步线程,那么this.reputMessageService.start();就要看这个类的run方法:
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
这个方法很简单,每睡眠1毫秒就调用一次reput方法。这个方法代码比较多,不一次性贴出来了,一点点说:
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
这个很简单:从commitLog文件中取出数据,位置就是上面说的reputFromOffset。得到的数据放在SelectMappedBufferResult中
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
如果存在数据,构建DispatchRequest对象,这个对象就是后续consumeQueue和IndexFile文件数据的基础:比如topic、queueId、keys、uniqKey等等
DefaultMessageStore.this.doDispatch(dispatchRequest);
转发请求,分为两个:CommitLogDispatcherBuildConsumeQueue 和 CommitLogDispatcherBuildIndex
- CommitLogDispatcherBuildConsumeQueue
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
构建consumeQueue,将数据写入consumeQueue:
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
findConsumeQueue方法根据topic和queueId查找到对应的consumeQueue文件;将数据写入consumeQueue。
将数据写入consumeQueue文件:
1)首先判断consumeQueue文件是否可写入
2)将commitLog的偏移量+msgSize+tagHashCode写入consumeQueue
3)更新checkPoint文件
- CommitLogDispatcherBuildIndex
构建IndexFile文件,思路和consumeQueue文件大体相同,只不过组成不同
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
indexService.buildIndex(request);
public void buildIndex(DispatchRequest req) {
// 1. 获取或创建当前写入的IndexFile
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
// 2. 如果 indexFile 的最大偏移量大于该消息的 commitLog offset,忽略本次构建
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
/*
下面3和4步骤中的uniq_key 和 keys
这两个存放在消息的propertisMap中
keys:用户在发送消息的时候可以指定,多个key用","分割
uniqKey:消息唯一键,消息id并不是唯一的,因为如果消息消费重试时,发送的消息id和原来是一致的
*/
// 3. 将uniqKeys 写入 indexFile
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 4. 将keys 写入 indexFile
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
看上面代码的第三步就出现了uniqKey,这个key就是producer发送消息生成的唯一key
以上就是consumeQueue和IndexFile文件的同步。下一篇文章将broker的其他一些小点整理一下然后就进入consumer的篇章了。
本文暂时没有评论,来添加一个吧(●'◡'●)