专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的sendHeartbeatToAllBrokerWithLock

baijin 2024-08-15 17:03:50 博客文章 21 ℃ 0 评论

本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock



sendHeartbeatToAllBrokerWithLock

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
 private final static long LOCK_TIMEOUT_MILLIS = 3000;
 private final InternalLogger log = ClientLogger.getLog();
?
 //......
?
 public void sendHeartbeatToAllBrokerWithLock() {
 if (this.lockHeartbeat.tryLock()) {
 try {
 this.sendHeartbeatToAllBroker();
 this.uploadFilterClassSource();
 } catch (final Exception e) {
 log.error("sendHeartbeatToAllBroker exception", e);
 } finally {
 this.lockHeartbeat.unlock();
 }
 } else {
 log.warn("lock heartBeat, but failed.");
 }
 }
?
 private void sendHeartbeatToAllBroker() {
 final HeartbeatData heartbeatData = this.prepareHeartbeatData();
 final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
 final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
 if (producerEmpty && consumerEmpty) {
 log.warn("sending heartbeat, but no consumer and no producer");
 return;
 }
?
 if (!this.brokerAddrTable.isEmpty()) {
 long times = this.sendHeartbeatTimesTotal.getAndIncrement();
 Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
 while (it.hasNext()) {
 Entry<String, HashMap<Long, String>> entry = it.next();
 String brokerName = entry.getKey();
 HashMap<Long, String> oneTable = entry.getValue();
 if (oneTable != null) {
 for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
 Long id = entry1.getKey();
 String addr = entry1.getValue();
 if (addr != null) {
 if (consumerEmpty) {
 if (id != MixAll.MASTER_ID)
 continue;
 }
?
 try {
 int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
 if (!this.brokerVersionTable.containsKey(brokerName)) {
 this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
 }
 this.brokerVersionTable.get(brokerName).put(addr, version);
 if (times % 20 == 0) {
 log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
 log.info(heartbeatData.toString());
 }
 } catch (Exception e) {
 if (this.isBrokerInNameServer(addr)) {
 log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
 } else {
 log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
 id, addr, e);
 }
 }
 }
 }
 }
 }
 }
 }
?
 private void uploadFilterClassSource() {
 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
 while (it.hasNext()) {
 Entry<String, MQConsumerInner> next = it.next();
 MQConsumerInner consumer = next.getValue();
 if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
 Set<SubscriptionData> subscriptions = consumer.subscriptions();
 for (SubscriptionData sub : subscriptions) {
 if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
 final String consumerGroup = consumer.groupName();
 final String className = sub.getSubString();
 final String topic = sub.getTopic();
 final String filterClassSource = sub.getFilterClassSource();
 try {
 this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
 } catch (Exception e) {
 log.error("uploadFilterClassToAllFilterServer Exception", e);
 }
 }
 }
 }
 }
 }
?
 //......
}
  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

sendHearbeat

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {
?
 private final static InternalLogger log = ClientLogger.getLog();
 private static boolean sendSmartMsg =
 Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
?
 static {
 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
 }
?
 //......
?
 public int sendHearbeat(
 final String addr,
 final HeartbeatData heartbeatData,
 final long timeoutMillis
 ) throws RemotingException, MQBrokerException, InterruptedException {
 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 request.setLanguage(clientConfig.getLanguage());
 request.setBody(heartbeatData.encode());
 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
 assert response != null;
 switch (response.getCode()) {
 case ResponseCode.SUCCESS: {
 return response.getVersion();
 }
 default:
 break;
 }
?
 throw new MQBrokerException(response.getCode(), response.getRemark());
 }
?
 //......
}
  • MQClientAPIImpl的sendHearbeat方法首先构建RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null),之后通过remotingClient.invokeSync(addr, request, timeoutMillis)执行请求,成功的话返回response.getVersion(),否则抛出MQBrokerException(response.getCode(), response.getRemark())

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {
 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 private final BrokerController brokerController;
?
 public ClientManageProcessor(final BrokerController brokerController) {
 this.brokerController = brokerController;
 }
?
 //......
?
 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
 throws RemotingCommandException {
 switch (request.getCode()) {
 case RequestCode.HEART_BEAT:
 return this.heartBeat(ctx, request);
 case RequestCode.UNREGISTER_CLIENT:
 return this.unregisterClient(ctx, request);
 case RequestCode.CHECK_CLIENT_CONFIG:
 return this.checkClientConfig(ctx, request);
 default:
 break;
 }
 return null;
 }
?
 public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
 RemotingCommand response = RemotingCommand.createResponseCommand(null);
 HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
 ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
 ctx.channel(),
 heartbeatData.getClientID(),
 request.getLanguage(),
 request.getVersion()
 );
?
 for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
 SubscriptionGroupConfig subscriptionGroupConfig =
 this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
 data.getGroupName());
 boolean isNotifyConsumerIdsChangedEnable = true;
 if (null != subscriptionGroupConfig) {
 isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
 int topicSysFlag = 0;
 if (data.isUnitMode()) {
 topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
 }
 String newTopic = MixAll.getRetryTopic(data.getGroupName());
 this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
 newTopic,
 subscriptionGroupConfig.getRetryQueueNums(),
 PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
 }
?
 boolean changed = this.brokerController.getConsumerManager().registerConsumer(
 data.getGroupName(),
 clientChannelInfo,
 data.getConsumeType(),
 data.getMessageModel(),
 data.getConsumeFromWhere(),
 data.getSubscriptionDataSet(),
 isNotifyConsumerIdsChangedEnable
 );
?
 if (changed) {
 log.info("registerConsumer info changed {} {}",
 data.toString(),
 RemotingHelper.parseChannelRemoteAddr(ctx.channel())
 );
 }
 }
?
 for (ProducerData data : heartbeatData.getProducerDataSet()) {
 this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
 clientChannelInfo);
 }
 response.setCode(ResponseCode.SUCCESS);
 response.setRemark(null);
 return response;
 }
?
 //......
}
  • processRequest方法对于RequestCode.HEART_BEAT会执行heartBeat(ctx, request)方法,该方法会解析body为heartbeatData,之后遍历heartbeatData的consumerData及producerData做不同处理,最后返回处理结果
  • 对于consumerData,会先取出subscriptionGroupConfig,对于config不为null的会执行brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod方法;最后执行brokerController.getConsumerManager().registerConsumer方法
  • 对于producerData,则执行brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo)方法

小结

  • MQClientInstance的sendHeartbeatToAllBrokerWithLock方法首先会执行lockHeartbeat.tryLock(),然后执行sendHeartbeatToAllBroker()及uploadFilterClassSource(),最后在finally中执行lockHeartbeat.unlock()
  • sendHeartbeatToAllBroker方法会通过prepareHeartbeatData构建heartbeatData,之后遍历brokerAddrTable,执行mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000)
  • uploadFilterClassSource方法会遍历consumerTable,对于consumeType为ConsumeType.CONSUME_PASSIVELY类型的,会遍历其subscriptions,对于sub.isClassFilterMode()且sub.getFilterClassSource()不为null的执行uploadFilterClassToAllFilterServer方法

doc

  • MQClientInstance

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表