专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的DefaultRocketMQListenerContainer

baijin 2024-08-29 12:11:55 博客文章 3 ℃ 0 评论

本文主要研究一下rocketmq的DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,
 RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
 private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
?
 private ApplicationContext applicationContext;
?
 /**
 * The name of the DefaultRocketMQListenerContainer instance
 */
 private String name;
?
 private long suspendCurrentQueueTimeMillis = 1000;
?
 /**
 * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
 * >0,client control retry frequency.
 */
 private int delayLevelWhenNextConsume = 0;
?
 private String nameServer;
?
 private AccessChannel accessChannel = AccessChannel.LOCAL;
?
 private String consumerGroup;
?
 private String topic;
?
 private int consumeThreadMax = 64;
?
 private String charset = "UTF-8";
?
 private ObjectMapper objectMapper;
?
 private RocketMQListener rocketMQListener;
?
 private RocketMQMessageListener rocketMQMessageListener;
?
 private DefaultMQPushConsumer consumer;
?
 private Class messageType;
?
 private boolean running;
?
 // The following properties came from @RocketMQMessageListener.
 private ConsumeMode consumeMode;
 private SelectorType selectorType;
 private String selectorExpression;
 private MessageModel messageModel;
 private long consumeTimeout;
?
 //......
?
 public void setRocketMQMessageListener(RocketMQMessageListener anno) {
 this.rocketMQMessageListener = anno;
?
 this.consumeMode = anno.consumeMode();
 this.consumeThreadMax = anno.consumeThreadMax();
 this.messageModel = anno.messageModel();
 this.selectorExpression = anno.selectorExpression();
 this.selectorType = anno.selectorType();
 this.consumeTimeout = anno.consumeTimeout();
 }
?
 @Override
 public void setupMessageListener(RocketMQListener rocketMQListener) {
 this.rocketMQListener = rocketMQListener;
 }
?
 @Override
 public void destroy() {
 this.setRunning(false);
 if (Objects.nonNull(consumer)) {
 consumer.shutdown();
 }
 log.info("container destroyed, {}", this.toString());
 }
?
 @Override
 public boolean isAutoStartup() {
 return true;
 }
?
 @Override
 public void stop(Runnable callback) {
 stop();
 callback.run();
 }
?
 @Override
 public void start() {
 if (this.isRunning()) {
 throw new IllegalStateException("container already running. " + this.toString());
 }
?
 try {
 consumer.start();
 } catch (MQClientException e) {
 throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
 }
 this.setRunning(true);
?
 log.info("running container: {}", this.toString());
 }
?
 @Override
 public void stop() {
 if (this.isRunning()) {
 if (Objects.nonNull(consumer)) {
 consumer.shutdown();
 }
 setRunning(false);
 }
 }
?
 @Override
 public boolean isRunning() {
 return running;
 }
?
 private void setRunning(boolean running) {
 this.running = running;
 }
?
 @Override
 public int getPhase() {
 // Returning Integer.MAX_VALUE only suggests that
 // we will be the first bean to shutdown and last bean to start
 return Integer.MAX_VALUE;
 }
?
?
 @Override
 public void afterPropertiesSet() throws Exception {
 initRocketMQPushConsumer();
?
 this.messageType = getMessageType();
 log.debug("RocketMQ messageType: {}", messageType.getName());
 }
?
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 this.applicationContext = applicationContext;
 }
?
 @Override
 public String toString() {
 return "DefaultRocketMQListenerContainer{" +
 "consumerGroup='" + consumerGroup + '\'' +
 ", nameServer='" + nameServer + '\'' +
 ", topic='" + topic + '\'' +
 ", consumeMode=" + consumeMode +
 ", selectorType=" + selectorType +
 ", selectorExpression='" + selectorExpression + '\'' +
 ", messageModel=" + messageModel +
 '}';
 }
?
 private void initRocketMQPushConsumer() throws MQClientException {
 Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
 Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
 Assert.notNull(nameServer, "Property 'nameServer' is required");
 Assert.notNull(topic, "Property 'topic' is required");
?
 RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
 this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
 boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
 if (Objects.nonNull(rpcHook)) {
 consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
 enableMsgTrace, this.applicationContext.getEnvironment().
 resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
 consumer.setVipChannelEnabled(false);
 consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
 } else {
 log.debug("Access-key or secret-key not configure in " + this + ".");
 consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
 this.applicationContext.getEnvironment().
 resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
 }
?
 String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
 if (customizedNameServer != null) {
 consumer.setNamesrvAddr(customizedNameServer);
 } else {
 consumer.setNamesrvAddr(nameServer);
 }
 if (accessChannel != null) {
 consumer.setAccessChannel(accessChannel);
 }
 consumer.setConsumeThreadMax(consumeThreadMax);
 if (consumeThreadMax < consumer.getConsumeThreadMin()) {
 consumer.setConsumeThreadMin(consumeThreadMax);
 }
 consumer.setConsumeTimeout(consumeTimeout);
 consumer.setInstanceName(this.name);
?
 switch (messageModel) {
 case BROADCASTING:
 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
 break;
 case CLUSTERING:
 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
 break;
 default:
 throw new IllegalArgumentException("Property 'messageModel' was wrong.");
 }
?
 switch (selectorType) {
 case TAG:
 consumer.subscribe(topic, selectorExpression);
 break;
 case SQL92:
 consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
 break;
 default:
 throw new IllegalArgumentException("Property 'selectorType' was wrong.");
 }
?
 switch (consumeMode) {
 case ORDERLY:
 consumer.setMessageListener(new DefaultMessageListenerOrderly());
 break;
 case CONCURRENTLY:
 consumer.setMessageListener(new DefaultMessageListenerConcurrently());
 break;
 default:
 throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
 }
?
 if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
 ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
 }
?
 }
?
 private Class getMessageType() {
 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
 Type[] interfaces = targetClass.getGenericInterfaces();
 Class<?> superclass = targetClass.getSuperclass();
 while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) {
 interfaces = superclass.getGenericInterfaces();
 superclass = targetClass.getSuperclass();
 }
 if (Objects.nonNull(interfaces)) {
 for (Type type : interfaces) {
 if (type instanceof ParameterizedType) {
 ParameterizedType parameterizedType = (ParameterizedType) type;
 if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {
 Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
 if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
 return (Class) actualTypeArguments[0];
 } else {
 return Object.class;
 }
 }
 }
 }
?
 return Object.class;
 } else {
 return Object.class;
 }
 }
?
 //......
}
  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

 public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
?
 @SuppressWarnings("unchecked")
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
 for (MessageExt messageExt : msgs) {
 log.debug("received msg: {}", messageExt);
 try {
 long now = System.currentTimeMillis();
 rocketMQListener.onMessage(doConvertMessage(messageExt));
 long costTime = System.currentTimeMillis() - now;
 log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
 } catch (Exception e) {
 log.warn("consume message failed. messageExt:{}", messageExt, e);
 context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
 }
?
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 }
  • DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

 public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
?
 @SuppressWarnings("unchecked")
 @Override
 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
 for (MessageExt messageExt : msgs) {
 log.debug("received msg: {}", messageExt);
 try {
 long now = System.currentTimeMillis();
 rocketMQListener.onMessage(doConvertMessage(messageExt));
 long costTime = System.currentTimeMillis() - now;
 log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
 } catch (Exception e) {
 log.warn("consume message failed. messageExt:{}", messageExt, e);
 context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
 }
 }
?
 return ConsumeOrderlyStatus.SUCCESS;
 }
 }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

小结

  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

doc

  • DefaultRocketMQListenerContainer

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表