专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的retryAnotherBrokerWhenNotStoreOK

baijin 2024-08-15 17:04:26 博客文章 13 ℃ 0 评论

本文主要研究一下rocketmq的retryAnotherBrokerWhenNotStoreOK

DefaultMQProducer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {
?
 private final InternalLogger log = ClientLogger.getLog();
?
 //......
?
 /**
 * Indicate whether to retry another broker on sending failure internally.
 */
 private boolean retryAnotherBrokerWhenNotStoreOK = false;
?
 public boolean isRetryAnotherBrokerWhenNotStoreOK() {
 return retryAnotherBrokerWhenNotStoreOK;
 }
?
 public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
 this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
 }
?
 //......
}
  • DefaultMQProducer有个retryAnotherBrokerWhenNotStoreOK属性,默认为false

DefaultMQProducerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
 private final InternalLogger log = ClientLogger.getLog();
 private final Random random = new Random();
 private final DefaultMQProducer defaultMQProducer;
 private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
 new ConcurrentHashMap<String, TopicPublishInfo>();
 private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
 private final RPCHook rpcHook;
 protected BlockingQueue<Runnable> checkRequestQueue;
 protected ExecutorService checkExecutor;
 private ServiceState serviceState = ServiceState.CREATE_JUST;
 private MQClientInstance mQClientFactory;
 private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
?
 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
?
 private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
 private final ExecutorService defaultAsyncSenderExecutor;
 private ExecutorService asyncSenderExecutor;
?
 //......
?
 private SendResult sendDefaultImpl(
 Message msg,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback,
 final long timeout
 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 this.makeSureStateOK();
 Validators.checkMessage(msg, this.defaultMQProducer);
?
 final long invokeID = random.nextLong();
 long beginTimestampFirst = System.currentTimeMillis();
 long beginTimestampPrev = beginTimestampFirst;
 long endTimestamp = beginTimestampFirst;
 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 if (topicPublishInfo != null && topicPublishInfo.ok()) {
 boolean callTimeout = false;
 MessageQueue mq = null;
 Exception exception = null;
 SendResult sendResult = null;
 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
 int times = 0;
 String[] brokersSent = new String[timesTotal];
 for (; times < timesTotal; times++) {
 String lastBrokerName = null == mq ? null : mq.getBrokerName();
 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
 if (mqSelected != null) {
 mq = mqSelected;
 brokersSent[times] = mq.getBrokerName();
 try {
 beginTimestampPrev = System.currentTimeMillis();
 if (times > 0) {
 //Reset topic with namespace during resend.
 msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
 }
 long costTime = beginTimestampPrev - beginTimestampFirst;
 if (timeout < costTime) {
 callTimeout = true;
 break;
 }
?
 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
 endTimestamp = System.currentTimeMillis();
 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 switch (communicationMode) {
 case ASYNC:
 return null;
 case ONEWAY:
 return null;
 case SYNC:
 if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
 continue;
 }
 }
?
 return sendResult;
 default:
 break;
 }
 } catch (RemotingException e) {
 endTimestamp = System.currentTimeMillis();
 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 log.warn(msg.toString());
 exception = e;
 continue;
 } catch (MQClientException e) {
 endTimestamp = System.currentTimeMillis();
 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 log.warn(msg.toString());
 exception = e;
 continue;
 } catch (MQBrokerException e) {
 endTimestamp = System.currentTimeMillis();
 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
 log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 log.warn(msg.toString());
 exception = e;
 switch (e.getResponseCode()) {
 case ResponseCode.TOPIC_NOT_EXIST:
 case ResponseCode.SERVICE_NOT_AVAILABLE:
 case ResponseCode.SYSTEM_ERROR:
 case ResponseCode.NO_PERMISSION:
 case ResponseCode.NO_BUYER_ID:
 case ResponseCode.NOT_IN_CURRENT_UNIT:
 continue;
 default:
 if (sendResult != null) {
 return sendResult;
 }
?
 throw e;
 }
 } catch (InterruptedException e) {
 endTimestamp = System.currentTimeMillis();
 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
 log.warn(msg.toString());
?
 log.warn("sendKernelImpl exception", e);
 log.warn(msg.toString());
 throw e;
 }
 } else {
 break;
 }
 }
?
 if (sendResult != null) {
 return sendResult;
 }
?
 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
 times,
 System.currentTimeMillis() - beginTimestampFirst,
 msg.getTopic(),
 Arrays.toString(brokersSent));
?
 info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
?
 MQClientException mqClientException = new MQClientException(info, exception);
 if (callTimeout) {
 throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
 }
?
 if (exception instanceof MQBrokerException) {
 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
 } else if (exception instanceof RemotingConnectException) {
 mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
 } else if (exception instanceof RemotingTimeoutException) {
 mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
 } else if (exception instanceof MQClientException) {
 mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
 }
?
 throw mqClientException;
 }
?
 List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
 if (null == nsList || nsList.isEmpty()) {
 throw new MQClientException(
 "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
 }
?
 throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
 null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
 }
?
 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
 }
?
 //......
}
  • DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

MQFaultStrategy

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java

public class MQFaultStrategy {
 private final static InternalLogger log = ClientLogger.getLog();
 private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
?
 private boolean sendLatencyFaultEnable = false;
?
 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
?
 public long[] getNotAvailableDuration() {
 return notAvailableDuration;
 }
?
 public void setNotAvailableDuration(final long[] notAvailableDuration) {
 this.notAvailableDuration = notAvailableDuration;
 }
?
 public long[] getLatencyMax() {
 return latencyMax;
 }
?
 public void setLatencyMax(final long[] latencyMax) {
 this.latencyMax = latencyMax;
 }
?
 public boolean isSendLatencyFaultEnable() {
 return sendLatencyFaultEnable;
 }
?
 public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
 this.sendLatencyFaultEnable = sendLatencyFaultEnable;
 }
?
 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 if (this.sendLatencyFaultEnable) {
 try {
 int index = tpInfo.getSendWhichQueue().getAndIncrement();
 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
 if (pos < 0)
 pos = 0;
 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
 return mq;
 }
 }
?
 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
 if (writeQueueNums > 0) {
 final MessageQueue mq = tpInfo.selectOneMessageQueue();
 if (notBestBroker != null) {
 mq.setBrokerName(notBestBroker);
 mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
 }
 return mq;
 } else {
 latencyFaultTolerance.remove(notBestBroker);
 }
 } catch (Exception e) {
 log.error("Error occurred when selecting message queue", e);
 }
?
 return tpInfo.selectOneMessageQueue();
 }
?
 return tpInfo.selectOneMessageQueue(lastBrokerName);
 }
?
 public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
 if (this.sendLatencyFaultEnable) {
 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
 }
 }
?
 private long computeNotAvailableDuration(final long currentLatency) {
 for (int i = latencyMax.length - 1; i >= 0; i--) {
 if (currentLatency >= latencyMax[i])
 return this.notAvailableDuration[i];
 }
?
 return 0;
 }
}
  • MQFaultStrategy的selectOneMessageQueue方法首先判断是否开启sendLatencyFaultEnable,默认为false,直接走tpInfo.selectOneMessageQueue(lastBrokerName)

TopicPublishInfo

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java

public class TopicPublishInfo {
 private boolean orderTopic = false;
 private boolean haveTopicRouterInfo = false;
 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
 private TopicRouteData topicRouteData;
?
 //......
?
 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
 if (lastBrokerName == null) {
 return selectOneMessageQueue();
 } else {
 int index = this.sendWhichQueue.getAndIncrement();
 for (int i = 0; i < this.messageQueueList.size(); i++) {
 int pos = Math.abs(index++) % this.messageQueueList.size();
 if (pos < 0)
 pos = 0;
 MessageQueue mq = this.messageQueueList.get(pos);
 if (!mq.getBrokerName().equals(lastBrokerName)) {
 return mq;
 }
 }
 return selectOneMessageQueue();
 }
 }
?
 public MessageQueue selectOneMessageQueue() {
 int index = this.sendWhichQueue.getAndIncrement();
 int pos = Math.abs(index) % this.messageQueueList.size();
 if (pos < 0)
 pos = 0;
 return this.messageQueueList.get(pos);
 }
?
 //......
}
  • TopicPublishInfo的selectOneMessageQueue在lastBrokerName为null的时候执行selectOneMessageQueue,采取的轮询的方式选择MessageQueue;lastBrokerName不为null的时候,最多循环messageQueueList.size()次,选出一个brokerName不为lastBrokerName的MessageQueue;如果都没有选到最后通过无参的selectOneMessageQueue来选择

小结

DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

doc

  • DefaultMQProducerImpl

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表