专业的编程技术博客社区

网站首页 > 博客文章 正文

RocketMQ源码分析之broker与客户端之间的网络交互组件

baijin 2024-08-15 17:03:49 博客文章 20 ℃ 0 评论

#头条创作挑战赛#

一、前言

前置知识点铺垫

Broker2Client是broker跟客户端之间的网络交互组件,主要负责检查生产者事务状态、通知消费者ids变化、复位偏移量、查询消费状态等;

注:核心功能就是broker通过Netty服务器主动向客户端(生产者/消费者)推送请求的组件;

二、源码分析

  1. 检查生产者事务状态;
  2. 通过Netty服务器向客户端发送同步请求;
  3. 通知消费者ids变化请求发送;
  4. 复位偏移量;
  5. 获取消费状态;

1、检查生产者事务状态

检查生产者事务状态,这个是跟事务消息配合起来来使用的;

  1. 如果说生产者发送的是事务消息,此时是先发送half消息,如果成功了,此时他才会走commit,否则rollback;
  2. 如果说连接中断,broker来说会自己检查事务消息的状态,以及回调生产者客户端检查本地事务状态,或通过broker主动回查以及检查,确保去推进commit或者是rollback;
  3. 生产者先发送一个half消息,如果half消息成功了,此时就可以执行本地事务,如果本地事务成功,就提交消息,如果失败就回滚消息;
  4. 如果说half消息失败,或者提交/回滚消息没发送出去,此时broker自动检查half消息状态;
  5. 如果说一个half消息超时了一直没有提交或者回滚,此时会回查生产者,问问他本地事务是成功/失败,broker再决定这个half消息是提交/回滚;
public void checkProducerTransactionState(
        // 生产者组
        final String group,
        // 生产者网络连接
        final Channel channel,
        // 检查事务状态请求头
        final CheckTransactionStateRequestHeader requestHeader,
        // 消息扩展数据
        final MessageExt messageExt) throws Exception {
    // 我发现有一个half消息超过一定timeout还没提交/回滚,没收到你生产者告诉我的本地事务状态
    // 我需要构造一个请求,去主动推送给你的生产者,让生产者去查询本地事务状态
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        // 通过Netty服务器发送oneway请求
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
                group, messageExt.getMsgId(), e.toString());
    }
}

数据结构

public class CheckTransactionStateRequestHeader implements CommandCustomHeader {

    @CFNotNull
    private Long tranStateTableOffset; // 事务状态表偏移量
    @CFNotNull
    private Long commitLogOffset; // commitlog偏移量
    private String msgId; // 消息id
    private String transactionId; // 事务id
    private String offsetMsgId; // 偏移量消息id
// 消息扩展
public class MessageExt extends Message {

    private static final long serialVersionUID = 5720810158625748049L;

    // 消息投递到了哪个broker组里面去
    private String brokerName;
    // 消息投递到了topic里面的哪个queueId里面去,这个queue一定是在一个broker组里某台机器里面
    private int queueId;
    // 消息存储大小
    private int storeSize;
    // 消息队列偏移量
    private long queueOffset;
    // sysflag
    private int sysFlag;
    // 消息诞生时间戳
    private long bornTimestamp;
    // 消息诞生机器地址
    private SocketAddress bornHost;
    // 消息存储时间戳
    private long storeTimestamp;
    // 消息存储机器地址
    private SocketAddress storeHost;
    // 消息id
    private String msgId;
    // 消息在commitlog里面的物理偏移量
    private long commitLogOffset;
    // 消息内容crc校验和
    private int bodyCRC;
    // 消息重新消费次数
    private int reconsumeTimes;
    // prepared事务消息偏移量
    private long preparedTransactionOffset;
}
// 消息
public class Message implements Serializable {

    private static final long serialVersionUID = 8445773977080406428L;

    // 消息投递到哪个topic里去
    private String topic;
    // flag
    private int flag;
    // 消息属性
    private Map<String, String> properties;
    // 消息内容
    private byte[] body;
    // 事务消息id
    private String transactionId;

}

编码就是RocketMQ自定义协议的一个过程,再通过Netty服务器RemotingServer发送oneway请求传输数据;

先构建指定大小的ByteBuffer,再按顺序写入数据,解码就是一个逆的过程,按顺序获取数据;

public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
    byte[] body = messageExt.getBody();
    byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
    byte topicLen = (byte) topics.length;
    String properties = messageProperties2String(messageExt.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    short propertiesLength = (short) propertiesBytes.length;
    int sysFlag = messageExt.getSysFlag();
    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
    int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
    byte[] newBody = messageExt.getBody();
    if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
        newBody = UtilAll.compress(body, 5);
    }
    int bodyLength = newBody.length;
    int storeSize = messageExt.getStoreSize();
    // 准备好一段内存Buffer
    ByteBuffer byteBuffer;
    if (storeSize > 0) {
        byteBuffer = ByteBuffer.allocate(storeSize);
    } else {
        storeSize = 4 // 1 TOTALSIZE 消息总大小
            + 4 // 2 MAGICCODE 魔数
            + 4 // 3 BODYCRC 校验和
            + 4 // 4 QUEUEID 本次消息所在topic的queueId
            + 4 // 5 FLAG 标识
            + 8 // 6 QUEUEOFFSET queue的偏移量
            + 8 // 7 PHYSICALOFFSET 物理偏移量
            + 4 // 8 SYSFLAG 系统消息标识
            + 8 // 9 BORNTIMESTAMP 诞生时间戳
            + bornhostLength // 10 BORNHOST 诞生机器地址长度
            + 8 // 11 STORETIMESTAMP 存储时间戳
            + storehostAddressLength // 12 STOREHOSTADDRESS 存储机器地址
            + 4 // 13 RECONSUMETIMES 重新消费次数
            + 8 // 14 Prepared Transaction Offset 预准备事物偏移量
            + 4 + bodyLength // 14 BODY body长度
            + 1 + topicLen // 15 TOPIC topic长度
            + 2 + propertiesLength // 16 propertiesLength properties属性长度
            + 0;
        byteBuffer = ByteBuffer.allocate(storeSize);
    }
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);

    // 2 MAGICCODE
    byteBuffer.putInt(MESSAGE_MAGIC_CODE);

    // 3 BODYCRC
    int bodyCRC = messageExt.getBodyCRC();
    byteBuffer.putInt(bodyCRC);

    // 4 QUEUEID
    int queueId = messageExt.getQueueId();
    byteBuffer.putInt(queueId);

    // 5 FLAG
    int flag = messageExt.getFlag();
    byteBuffer.putInt(flag);

    // 6 QUEUEOFFSET
    long queueOffset = messageExt.getQueueOffset();
    byteBuffer.putLong(queueOffset);

    // 7 PHYSICALOFFSET
    long physicOffset = messageExt.getCommitLogOffset();
    byteBuffer.putLong(physicOffset);

    // 8 SYSFLAG
    byteBuffer.putInt(sysFlag);

    // 9 BORNTIMESTAMP
    long bornTimeStamp = messageExt.getBornTimestamp();
    byteBuffer.putLong(bornTimeStamp);

    // 10 BORNHOST
    InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
    byteBuffer.put(bornHost.getAddress().getAddress());
    byteBuffer.putInt(bornHost.getPort());

    // 11 STORETIMESTAMP
    long storeTimestamp = messageExt.getStoreTimestamp();
    byteBuffer.putLong(storeTimestamp);

    // 12 STOREHOST
    InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
    byteBuffer.put(serverHost.getAddress().getAddress());
    byteBuffer.putInt(serverHost.getPort());

    // 13 RECONSUMETIMES
    int reconsumeTimes = messageExt.getReconsumeTimes();
    byteBuffer.putInt(reconsumeTimes);

    // 14 Prepared Transaction Offset
    long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
    byteBuffer.putLong(preparedTransactionOffset);

    // 15 BODY
    byteBuffer.putInt(bodyLength);
    byteBuffer.put(newBody);

    // 16 TOPIC
    byteBuffer.put(topicLen);
    byteBuffer.put(topics);

    // 17 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);

    return byteBuffer.array();
}

2、通过Netty服务器向客户端发送同步请求

// 便捷方法,直接把构造好的RemotingCommand通过netty网络连接发送出去
public RemotingCommand callClient(
        final Channel channel,
        final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
    // 通过Netty服务器发送同步请求
    return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}

3、通知消费者ids变化请求发送

// 消费者ids有变化,会推送给你这个事件通知
public void notifyConsumerIdsChanged(
    final Channel channel,
    final String consumerGroup) {
    if (null == consumerGroup) {
        log.error("notifyConsumerIdsChanged consumerGroup is null");
        return;
    }

    // 构建消费者ids变化请求头
    NotifyConsumerIdsChangedRequestHeader requestHeader =
            new NotifyConsumerIdsChangedRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);

    try {
        // 通过Netty服务器发送oneway请求
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
    }
}

4、复位偏移量

public RemotingCommand resetOffset(
        String topic, // topic
        String group, // 消费组
        long timeStamp,  // 时间戳
        boolean isForce, // 是否强制
        boolean isC) { // 是否c语言
    // 他是属于broker收到了一个请求,然后可以通过我这个组件,返回一个响应回去
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);

    // 获取到topic元数据
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    // 为空抛出异常
    if (null == topicConfig) {
        log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
        return response;
    }

    Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();

    // 遍历topic在我的这个broker里每一个write queue
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        // 消息队列
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);

        // i是queueId,查询消费组对一个topic的一个queue,他消费偏移量
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
                group, topic, i);
        if (-1 == consumerOffset) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("THe consumer group <%s> not exist", group));
            return response;
        }

        // 时间戳偏移量,要不然就是queue里的最大偏移量,要不然就是根据时间戳查询到的偏移量
        long timeStampOffset;
        if (timeStamp == -1) {
            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }

        if (timeStampOffset < 0) {
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }

        // 是否强制启用,或者是时间戳偏移量是否小于消费偏移量
        if (isForce || timeStampOffset < consumerOffset) {
            // 使用时间戳偏移量
            offsetTable.put(mq, timeStampOffset);
        } else {
            // 使用消费偏移量
            offsetTable.put(mq, consumerOffset);
        }
    }

    // 构建一个复位偏移量的请求
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    // 是否c语言
    if (isC) {
        // c++ language
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        // other language
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }

    // 获取消费组信息
    ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        // 遍历某一个消费组连接
        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
            consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    // 会对消费组里的每个消费者都推送一个复位偏移量请求
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                        topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
                        topic, group, e.toString());
                }
            } else {
                // 抛出系统异常
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. version="
                    + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo =
            String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
                requestHeader.getGroup(),
                requestHeader.getTopic(),
                requestHeader.getTimestamp());
        log.error(errorInfo);
        // 抛出异常
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }

    // 返回成功
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());

    return response;
}
// 把偏移量table转换为偏移量list
private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
    List<MessageQueueForC> list = new ArrayList<>();
    for (Entry<MessageQueue, Long> entry : table.entrySet()) {
        MessageQueue mq = entry.getKey();
        MessageQueueForC tmp =
            new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
        list.add(tmp);
    }
    return list;
}

5、获取消费状态

// 获取消费状态,针对某个消费者去查询他的消费状态
public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
    final RemotingCommand result = RemotingCommand.createResponseCommand(null);

    // 获取消费者状态请求头,构建好请求
    GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    RemotingCommand request = RemotingCommand.createRequestCommand(
            RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, requestHeader);

    // consumerId-><queue,offset>
    Map<String, Map<MessageQueue, Long>> consumerStatusTable =
        new HashMap<String, Map<MessageQueue, Long>>();
    // 获取到一个消费组里所有的消费者网络连接
    ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
    // 如果为空,抛出系统异常
    if (null == channelInfoTable || channelInfoTable.isEmpty()) {
        result.setCode(ResponseCode.SYSTEM_ERROR);
        result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
        return result;
    }

    // 遍历每个消费者的网络连接
    for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
        int version = entry.getValue().getVersion();
        String clientId = entry.getValue().getClientId();
        // 如果当前版本小于指定版本,抛出异常,打印日志
        if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
            result.setCode(ResponseCode.SYSTEM_ERROR);
            result.setRemark("the client does not support this feature. version="
                + MQVersion.getVersionDesc(version));
            log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
                RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
            return result;
        } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
            try {
                // 同步发送一个请求给消费者,获取他的消费状态,同步等待响应,超时是5s
                RemotingCommand response = this.brokerController.getRemotingServer().invokeSync(
                        entry.getKey(), request, 5000);
                assert response != null;
                switch (response.getCode()) {
                    case ResponseCode.SUCCESS: {
                        if (response.getBody() != null) {
                            GetConsumerStatusBody body =
                                GetConsumerStatusBody.decode(response.getBody(),
                                    GetConsumerStatusBody.class);

                            // 把结果放入一个table里去
                            consumerStatusTable.put(clientId, body.getMessageQueueTable());
                            log.info(
                                "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
                                topic, group, clientId);
                        }
                    }
                    default:
                        break;
                }
            } catch (Exception e) {
                log.error(
                    "[get-consumer-status] get consumer status exception. topic={}, group={}, error={}",
                    topic, group, e.toString());
            }

            if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
                break;
            }
        }
    }

    // 返回成功
    result.setCode(ResponseCode.SUCCESS);
    GetConsumerStatusBody resBody = new GetConsumerStatusBody();
    resBody.setConsumerTable(consumerStatusTable);
    result.setBody(resBody.encode());
    return result;
}

三、总结

Broker2Client的核心职责是调用底层的Netty服务器RemotingServer去实际发送请求;

  1. 创建序列化对象RemotingCommand;
  2. 调用MessageDecoder.encode构建Body体;
  3. 调用Netty服务器RemotingServer发送请求;

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表