专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的ConsumeMode.ORDERLY

baijin 2024-08-23 10:35:54 博客文章 3 ℃ 0 评论

本文主要研究一下rocketmq的ConsumeMode.ORDERLY



ConsumeMode.ORDERLY

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java

public enum ConsumeMode {
 ?  /**
 ? ? * Receive asynchronously delivered messages concurrently
 ? ? */
 ?  CONCURRENTLY,
?
 ?  /**
 ? ? * Receive asynchronously delivered messages orderly. one queue, one thread
 ? ? */
 ?  ORDERLY
}
  • ConsumeMode定义了CONCURRENTLY、ORDERLY两个枚举值

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,
 ?  RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
?
 ?  //......
?
 ?  private void initRocketMQPushConsumer() throws MQClientException {
 ? ? ?  Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
 ? ? ?  Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
 ? ? ?  Assert.notNull(nameServer, "Property 'nameServer' is required");
 ? ? ?  Assert.notNull(topic, "Property 'topic' is required");
?
 ? ? ?  RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
 ? ? ? ? ?  this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
 ? ? ?  boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
 ? ? ?  if (Objects.nonNull(rpcHook)) {
 ? ? ? ? ?  consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
 ? ? ? ? ? ? ?  enableMsgTrace, this.applicationContext.getEnvironment().
 ? ? ? ? ? ? ?  resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
 ? ? ? ? ?  consumer.setVipChannelEnabled(false);
 ? ? ? ? ?  consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
 ? ? ?  } else {
 ? ? ? ? ?  log.debug("Access-key or secret-key not configure in " + this + ".");
 ? ? ? ? ?  consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
 ? ? ? ? ? ? ? ? ?  this.applicationContext.getEnvironment().
 ? ? ? ? ? ? ? ? ?  resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
 ? ? ?  }
?
 ? ? ?  String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
 ? ? ?  if (customizedNameServer != null) {
 ? ? ? ? ?  consumer.setNamesrvAddr(customizedNameServer);
 ? ? ?  } else {
 ? ? ? ? ?  consumer.setNamesrvAddr(nameServer);
 ? ? ?  }
 ? ? ?  if (accessChannel != null) {
 ? ? ? ? ?  consumer.setAccessChannel(accessChannel);
 ? ? ?  }
 ? ? ?  consumer.setConsumeThreadMax(consumeThreadMax);
 ? ? ?  if (consumeThreadMax < consumer.getConsumeThreadMin()) {
 ? ? ? ? ?  consumer.setConsumeThreadMin(consumeThreadMax);
 ? ? ?  }
 ? ? ?  consumer.setConsumeTimeout(consumeTimeout);
?
 ? ? ?  switch (messageModel) {
 ? ? ? ? ?  case BROADCASTING:
 ? ? ? ? ? ? ?  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  case CLUSTERING:
 ? ? ? ? ? ? ?  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  default:
 ? ? ? ? ? ? ?  throw new IllegalArgumentException("Property 'messageModel' was wrong.");
 ? ? ?  }
?
 ? ? ?  switch (selectorType) {
 ? ? ? ? ?  case TAG:
 ? ? ? ? ? ? ?  consumer.subscribe(topic, selectorExpression);
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  case SQL92:
 ? ? ? ? ? ? ?  consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  default:
 ? ? ? ? ? ? ?  throw new IllegalArgumentException("Property 'selectorType' was wrong.");
 ? ? ?  }
?
 ? ? ?  switch (consumeMode) {
 ? ? ? ? ?  case ORDERLY:
 ? ? ? ? ? ? ?  consumer.setMessageListener(new DefaultMessageListenerOrderly());
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  case CONCURRENTLY:
 ? ? ? ? ? ? ?  consumer.setMessageListener(new DefaultMessageListenerConcurrently());
 ? ? ? ? ? ? ?  break;
 ? ? ? ? ?  default:
 ? ? ? ? ? ? ?  throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
 ? ? ?  }
?
 ? ? ?  if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
 ? ? ? ? ?  ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
 ? ? ?  }
?
 ?  }
?
}
  • initRocketMQPushConsumer方法对于consumeMode为ORDERLY的设置的messageListener为DefaultMessageListenerOrderly

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

 ?  public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
?
 ? ? ?  @SuppressWarnings("unchecked")
 ? ? ?  @Override
 ? ? ?  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
 ? ? ? ? ?  for (MessageExt messageExt : msgs) {
 ? ? ? ? ? ? ?  log.debug("received msg: {}", messageExt);
 ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ?  long now = System.currentTimeMillis();
 ? ? ? ? ? ? ? ? ?  rocketMQListener.onMessage(doConvertMessage(messageExt));
 ? ? ? ? ? ? ? ? ?  long costTime = System.currentTimeMillis() - now;
 ? ? ? ? ? ? ? ? ?  log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
 ? ? ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ? ? ?  log.warn("consume message failed. messageExt:{}", messageExt, e);
 ? ? ? ? ? ? ? ? ?  context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
 ? ? ? ? ? ? ? ? ?  return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
?
 ? ? ? ? ?  return ConsumeOrderlyStatus.SUCCESS;
 ? ? ?  }
 ?  }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

ConsumeMessageOrderlyService

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

 ?  class ConsumeRequest implements Runnable {
 ? ? ?  private final ProcessQueue processQueue;
 ? ? ?  private final MessageQueue messageQueue;
?
 ? ? ?  public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
 ? ? ? ? ?  this.processQueue = processQueue;
 ? ? ? ? ?  this.messageQueue = messageQueue;
 ? ? ?  }
?
 ? ? ?  public ProcessQueue getProcessQueue() {
 ? ? ? ? ?  return processQueue;
 ? ? ?  }
?
 ? ? ?  public MessageQueue getMessageQueue() {
 ? ? ? ? ?  return messageQueue;
 ? ? ?  }
?
 ? ? ?  @Override
 ? ? ?  public void run() {
 ? ? ? ? ?  if (this.processQueue.isDropped()) {
 ? ? ? ? ? ? ?  log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 ? ? ? ? ? ? ?  return;
 ? ? ? ? ?  }
?
 ? ? ? ? ?  final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 ? ? ? ? ?  synchronized (objLock) {
 ? ? ? ? ? ? ?  if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 ? ? ? ? ? ? ? ? ?  || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
 ? ? ? ? ? ? ? ? ?  final long beginTime = System.currentTimeMillis();
 ? ? ? ? ? ? ? ? ?  for (boolean continueConsume = true; continueConsume; ) {
 ? ? ? ? ? ? ? ? ? ? ?  if (this.processQueue.isDropped()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ?  if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 ? ? ? ? ? ? ? ? ? ? ? ? ?  && !this.processQueue.isLocked()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ?  if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
 ? ? ? ? ? ? ? ? ? ? ? ? ?  && this.processQueue.isLockExpired()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ?  long interval = System.currentTimeMillis() - beginTime;
 ? ? ? ? ? ? ? ? ? ? ?  if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ?  final int consumeBatchSize =
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
?
 ? ? ? ? ? ? ? ? ? ? ?  List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
 ? ? ? ? ? ? ? ? ? ? ?  defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
 ? ? ? ? ? ? ? ? ? ? ?  if (!msgs.isEmpty()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeOrderlyStatus status = null;
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageContext consumeMessageContext = null;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext = new ConsumeMessageContext();
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setMq(messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setMsgList(msgs);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setSuccess(false);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  // init the consume context type
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setProps(new HashMap<String, String>());
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  long beginTimestamp = System.currentTimeMillis();
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  boolean hasException = false;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  try {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  this.processQueue.getLockConsume().lock();
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  if (this.processQueue.isDropped()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  this.messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  } catch (Throwable e) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  RemotingHelper.exceptionSimpleDesc(e),
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.consumerGroup,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  msgs,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  hasException = true;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  } finally {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  this.processQueue.getLockConsume().unlock();
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (null == status
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  || ConsumeOrderlyStatus.ROLLBACK == status
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.consumerGroup,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  msgs,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  messageQueue);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  long consumeRT = System.currentTimeMillis() - beginTimestamp;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (null == status) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  if (hasException) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  returnType = ConsumeReturnType.EXCEPTION;
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  returnType = ConsumeReturnType.RETURNNULL;
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ? ? ? ? ? ?  } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  returnType = ConsumeReturnType.TIME_OUT;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  returnType = ConsumeReturnType.FAILED;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  } else if (ConsumeOrderlyStatus.SUCCESS == status) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  returnType = ConsumeReturnType.SUCCESS;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (null == status) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext.setStatus(status.toString());
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  consumeMessageContext
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
 ? ? ? ? ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.getConsumerStatsManager()
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?  .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
?
 ? ? ? ? ? ? ? ? ? ? ? ? ?  continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
 ? ? ? ? ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  continueConsume = false;
 ? ? ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ?  if (this.processQueue.isDropped()) {
 ? ? ? ? ? ? ? ? ? ? ?  log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
 ? ? ? ? ? ? ? ? ? ? ?  return;
 ? ? ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ? ? ?  ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }
?
 ?  }
  • ConsumeRequest实现了Runnable接口,它的构造器要求传入processQueue、messageQueue参数;其run 方法首先通过messageQueueLock.fetchLockObject(this.messageQueue)获取objLock,之后synchronized该objLock进行后续操作
  • 对于messageModel非MessageModel.BROADCASTING的且(this.processQueue.isLocked() && !this.processQueue.isLockExpired())不成立的则执行tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100)
  • 之后通过processQueue.takeMessags(consumeBatchSize),然后执行processQueue.getLockConsume().lock(),再执行messageListener.consumeMessage(Collections.unmodifiableList(msgs), context),最后在finally执行processQueue.getLockConsume().unlock(),之后通过ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this)处理ConsumeOrderlyStatus

ProcessQueue

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {
?
    //......
?
 ?  public List<MessageExt> takeMessags(final int batchSize) {
 ? ? ?  List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
 ? ? ?  final long now = System.currentTimeMillis();
 ? ? ?  try {
 ? ? ? ? ?  this.lockTreeMap.writeLock().lockInterruptibly();
 ? ? ? ? ?  this.lastConsumeTimestamp = now;
 ? ? ? ? ?  try {
 ? ? ? ? ? ? ?  if (!this.msgTreeMap.isEmpty()) {
 ? ? ? ? ? ? ? ? ?  for (int i = 0; i < batchSize; i++) {
 ? ? ? ? ? ? ? ? ? ? ?  Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
 ? ? ? ? ? ? ? ? ? ? ?  if (entry != null) {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  result.add(entry.getValue());
 ? ? ? ? ? ? ? ? ? ? ? ? ?  consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
 ? ? ? ? ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ? ? ? ? ?  break;
 ? ? ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
?
 ? ? ? ? ? ? ?  if (result.isEmpty()) {
 ? ? ? ? ? ? ? ? ?  consuming = false;
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  } finally {
 ? ? ? ? ? ? ?  this.lockTreeMap.writeLock().unlock();
 ? ? ? ? ?  }
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ?  log.error("take Messages exception", e);
 ? ? ?  }
?
 ? ? ?  return result;
 ?  }
?
 ?  //......
}
  • takeMessags方法先执行lockTreeMap.writeLock().lockInterruptibly(),然后执行msgTreeMap.pollFirstEntry()及consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());最后finally执行lockTreeMap.writeLock().unlock()

小结

DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

doc

  • ConsumeMessageOrderlyService

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表