一、前言
前置知识点铺垫
- RocketMQ中Netty服务器RemotingServer发送请求的3种方式
- RocketMQ源码分析之TopicConfigManager
- RocketMQ源码分析之ConsumerOffsetManager消费偏移量管理组件
- RocketMQ源码分析之消费者管理组件ConsumerManager
Broker2Client是broker跟客户端之间的网络交互组件,主要负责检查生产者事务状态、通知消费者ids变化、复位偏移量、查询消费状态等;
注:核心功能就是broker通过Netty服务器主动向客户端(生产者/消费者)推送请求的组件;
二、源码分析
- 检查生产者事务状态;
- 通过Netty服务器向客户端发送同步请求;
- 通知消费者ids变化请求发送;
- 复位偏移量;
- 获取消费状态;
1、检查生产者事务状态
检查生产者事务状态,这个是跟事务消息配合起来来使用的;
- 如果说生产者发送的是事务消息,此时是先发送half消息,如果成功了,此时他才会走commit,否则rollback;
- 如果说连接中断,broker来说会自己检查事务消息的状态,以及回调生产者客户端检查本地事务状态,或通过broker主动回查以及检查,确保去推进commit或者是rollback;
- 生产者先发送一个half消息,如果half消息成功了,此时就可以执行本地事务,如果本地事务成功,就提交消息,如果失败就回滚消息;
- 如果说half消息失败,或者提交/回滚消息没发送出去,此时broker自动检查half消息状态;
- 如果说一个half消息超时了一直没有提交或者回滚,此时会回查生产者,问问他本地事务是成功/失败,broker再决定这个half消息是提交/回滚;
public void checkProducerTransactionState(
// 生产者组
final String group,
// 生产者网络连接
final Channel channel,
// 检查事务状态请求头
final CheckTransactionStateRequestHeader requestHeader,
// 消息扩展数据
final MessageExt messageExt) throws Exception {
// 我发现有一个half消息超过一定timeout还没提交/回滚,没收到你生产者告诉我的本地事务状态
// 我需要构造一个请求,去主动推送给你的生产者,让生产者去查询本地事务状态
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
// 通过Netty服务器发送oneway请求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
}
数据结构
public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long tranStateTableOffset; // 事务状态表偏移量
@CFNotNull
private Long commitLogOffset; // commitlog偏移量
private String msgId; // 消息id
private String transactionId; // 事务id
private String offsetMsgId; // 偏移量消息id
// 消息扩展
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
// 消息投递到了哪个broker组里面去
private String brokerName;
// 消息投递到了topic里面的哪个queueId里面去,这个queue一定是在一个broker组里某台机器里面
private int queueId;
// 消息存储大小
private int storeSize;
// 消息队列偏移量
private long queueOffset;
// sysflag
private int sysFlag;
// 消息诞生时间戳
private long bornTimestamp;
// 消息诞生机器地址
private SocketAddress bornHost;
// 消息存储时间戳
private long storeTimestamp;
// 消息存储机器地址
private SocketAddress storeHost;
// 消息id
private String msgId;
// 消息在commitlog里面的物理偏移量
private long commitLogOffset;
// 消息内容crc校验和
private int bodyCRC;
// 消息重新消费次数
private int reconsumeTimes;
// prepared事务消息偏移量
private long preparedTransactionOffset;
}
// 消息
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
// 消息投递到哪个topic里去
private String topic;
// flag
private int flag;
// 消息属性
private Map<String, String> properties;
// 消息内容
private byte[] body;
// 事务消息id
private String transactionId;
}
编码就是RocketMQ自定义协议的一个过程,再通过Netty服务器RemotingServer发送oneway请求传输数据;
先构建指定大小的ByteBuffer,再按顺序写入数据,解码就是一个逆的过程,按顺序获取数据;
public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
byte[] body = messageExt.getBody();
byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
byte topicLen = (byte) topics.length;
String properties = messageProperties2String(messageExt.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
short propertiesLength = (short) propertiesBytes.length;
int sysFlag = messageExt.getSysFlag();
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
byte[] newBody = messageExt.getBody();
if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
newBody = UtilAll.compress(body, 5);
}
int bodyLength = newBody.length;
int storeSize = messageExt.getStoreSize();
// 准备好一段内存Buffer
ByteBuffer byteBuffer;
if (storeSize > 0) {
byteBuffer = ByteBuffer.allocate(storeSize);
} else {
storeSize = 4 // 1 TOTALSIZE 消息总大小
+ 4 // 2 MAGICCODE 魔数
+ 4 // 3 BODYCRC 校验和
+ 4 // 4 QUEUEID 本次消息所在topic的queueId
+ 4 // 5 FLAG 标识
+ 8 // 6 QUEUEOFFSET queue的偏移量
+ 8 // 7 PHYSICALOFFSET 物理偏移量
+ 4 // 8 SYSFLAG 系统消息标识
+ 8 // 9 BORNTIMESTAMP 诞生时间戳
+ bornhostLength // 10 BORNHOST 诞生机器地址长度
+ 8 // 11 STORETIMESTAMP 存储时间戳
+ storehostAddressLength // 12 STOREHOSTADDRESS 存储机器地址
+ 4 // 13 RECONSUMETIMES 重新消费次数
+ 8 // 14 Prepared Transaction Offset 预准备事物偏移量
+ 4 + bodyLength // 14 BODY body长度
+ 1 + topicLen // 15 TOPIC topic长度
+ 2 + propertiesLength // 16 propertiesLength properties属性长度
+ 0;
byteBuffer = ByteBuffer.allocate(storeSize);
}
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(MESSAGE_MAGIC_CODE);
// 3 BODYCRC
int bodyCRC = messageExt.getBodyCRC();
byteBuffer.putInt(bodyCRC);
// 4 QUEUEID
int queueId = messageExt.getQueueId();
byteBuffer.putInt(queueId);
// 5 FLAG
int flag = messageExt.getFlag();
byteBuffer.putInt(flag);
// 6 QUEUEOFFSET
long queueOffset = messageExt.getQueueOffset();
byteBuffer.putLong(queueOffset);
// 7 PHYSICALOFFSET
long physicOffset = messageExt.getCommitLogOffset();
byteBuffer.putLong(physicOffset);
// 8 SYSFLAG
byteBuffer.putInt(sysFlag);
// 9 BORNTIMESTAMP
long bornTimeStamp = messageExt.getBornTimestamp();
byteBuffer.putLong(bornTimeStamp);
// 10 BORNHOST
InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
byteBuffer.put(bornHost.getAddress().getAddress());
byteBuffer.putInt(bornHost.getPort());
// 11 STORETIMESTAMP
long storeTimestamp = messageExt.getStoreTimestamp();
byteBuffer.putLong(storeTimestamp);
// 12 STOREHOST
InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
byteBuffer.put(serverHost.getAddress().getAddress());
byteBuffer.putInt(serverHost.getPort());
// 13 RECONSUMETIMES
int reconsumeTimes = messageExt.getReconsumeTimes();
byteBuffer.putInt(reconsumeTimes);
// 14 Prepared Transaction Offset
long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
byteBuffer.putLong(preparedTransactionOffset);
// 15 BODY
byteBuffer.putInt(bodyLength);
byteBuffer.put(newBody);
// 16 TOPIC
byteBuffer.put(topicLen);
byteBuffer.put(topics);
// 17 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}
2、通过Netty服务器向客户端发送同步请求
// 便捷方法,直接把构造好的RemotingCommand通过netty网络连接发送出去
public RemotingCommand callClient(
final Channel channel,
final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// 通过Netty服务器发送同步请求
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
3、通知消费者ids变化请求发送
// 消费者ids有变化,会推送给你这个事件通知
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
// 构建消费者ids变化请求头
NotifyConsumerIdsChangedRequestHeader requestHeader =
new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
// 通过Netty服务器发送oneway请求
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
4、复位偏移量
public RemotingCommand resetOffset(
String topic, // topic
String group, // 消费组
long timeStamp, // 时间戳
boolean isForce, // 是否强制
boolean isC) { // 是否c语言
// 他是属于broker收到了一个请求,然后可以通过我这个组件,返回一个响应回去
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 获取到topic元数据
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
// 为空抛出异常
if (null == topicConfig) {
log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
return response;
}
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
// 遍历topic在我的这个broker里每一个write queue
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
// 消息队列
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
// i是queueId,查询消费组对一个topic的一个queue,他消费偏移量
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
group, topic, i);
if (-1 == consumerOffset) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("THe consumer group <%s> not exist", group));
return response;
}
// 时间戳偏移量,要不然就是queue里的最大偏移量,要不然就是根据时间戳查询到的偏移量
long timeStampOffset;
if (timeStamp == -1) {
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
// 是否强制启用,或者是时间戳偏移量是否小于消费偏移量
if (isForce || timeStampOffset < consumerOffset) {
// 使用时间戳偏移量
offsetTable.put(mq, timeStampOffset);
} else {
// 使用消费偏移量
offsetTable.put(mq, consumerOffset);
}
}
// 构建一个复位偏移量的请求
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
// 是否c语言
if (isC) {
// c++ language
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
// other language
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
// 获取消费组信息
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
// 遍历某一个消费组连接
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
// 会对消费组里的每个消费者都推送一个复位偏移量请求
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
topic, group, e.toString());
}
} else {
// 抛出系统异常
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo =
String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
requestHeader.getGroup(),
requestHeader.getTopic(),
requestHeader.getTimestamp());
log.error(errorInfo);
// 抛出异常
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
// 返回成功
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}
// 把偏移量table转换为偏移量list
private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
List<MessageQueueForC> list = new ArrayList<>();
for (Entry<MessageQueue, Long> entry : table.entrySet()) {
MessageQueue mq = entry.getKey();
MessageQueueForC tmp =
new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
list.add(tmp);
}
return list;
}
5、获取消费状态
// 获取消费状态,针对某个消费者去查询他的消费状态
public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
final RemotingCommand result = RemotingCommand.createResponseCommand(null);
// 获取消费者状态请求头,构建好请求
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, requestHeader);
// consumerId-><queue,offset>
Map<String, Map<MessageQueue, Long>> consumerStatusTable =
new HashMap<String, Map<MessageQueue, Long>>();
// 获取到一个消费组里所有的消费者网络连接
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
// 如果为空,抛出系统异常
if (null == channelInfoTable || channelInfoTable.isEmpty()) {
result.setCode(ResponseCode.SYSTEM_ERROR);
result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
return result;
}
// 遍历每个消费者的网络连接
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
String clientId = entry.getValue().getClientId();
// 如果当前版本小于指定版本,抛出异常,打印日志
if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
result.setCode(ResponseCode.SYSTEM_ERROR);
result.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
try {
// 同步发送一个请求给消费者,获取他的消费状态,同步等待响应,超时是5s
RemotingCommand response = this.brokerController.getRemotingServer().invokeSync(
entry.getKey(), request, 5000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetConsumerStatusBody body =
GetConsumerStatusBody.decode(response.getBody(),
GetConsumerStatusBody.class);
// 把结果放入一个table里去
consumerStatusTable.put(clientId, body.getMessageQueueTable());
log.info(
"[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
topic, group, clientId);
}
}
default:
break;
}
} catch (Exception e) {
log.error(
"[get-consumer-status] get consumer status exception. topic={}, group={}, error={}",
topic, group, e.toString());
}
if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
break;
}
}
}
// 返回成功
result.setCode(ResponseCode.SUCCESS);
GetConsumerStatusBody resBody = new GetConsumerStatusBody();
resBody.setConsumerTable(consumerStatusTable);
result.setBody(resBody.encode());
return result;
}
三、总结
Broker2Client的核心职责是调用底层的Netty服务器RemotingServer去实际发送请求;
- 创建序列化对象RemotingCommand;
- 调用MessageDecoder.encode构建Body体;
- 调用Netty服务器RemotingServer发送请求;
本文暂时没有评论,来添加一个吧(●'◡'●)