专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的ListenerContainerConfiguration

baijin 2024-08-29 12:11:56 博客文章 3 ℃ 0 评论

本文主要研究一下rocketmq的ListenerContainerConfiguration

ListenerContainerConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
 private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
?
 private ConfigurableApplicationContext applicationContext;
?
 private AtomicLong counter = new AtomicLong(0);
?
 private StandardEnvironment environment;
?
 private RocketMQProperties rocketMQProperties;
?
 private ObjectMapper objectMapper;
?
 public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
 StandardEnvironment environment,
 RocketMQProperties rocketMQProperties) {
 this.objectMapper = rocketMQMessageObjectMapper;
 this.environment = environment;
 this.rocketMQProperties = rocketMQProperties;
 }
?
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 this.applicationContext = (ConfigurableApplicationContext) applicationContext;
 }
?
 @Override
 public void afterSingletonsInstantiated() {
 Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
?
 if (Objects.nonNull(beans)) {
 beans.forEach(this::registerContainer);
 }
 }
?
 private void registerContainer(String beanName, Object bean) {
 Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
?
 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
 throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
 }
?
 RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
 validate(annotation);
?
 String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
 counter.incrementAndGet());
 GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
?
 genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
 () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
 DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
 DefaultRocketMQListenerContainer.class);
 if (!container.isRunning()) {
 try {
 container.start();
 } catch (Exception e) {
 log.error("Started container failed. {}", container, e);
 throw new RuntimeException(e);
 }
 }
?
 log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
 }
?
 private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
 DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
?
 String nameServer = environment.resolvePlaceholders(annotation.nameServer());
 nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
 String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
 container.setNameServer(nameServer);
 if (!StringUtils.isEmpty(accessChannel)) {
 container.setAccessChannel(AccessChannel.valueOf(accessChannel));
 }
 container.setTopic(environment.resolvePlaceholders(annotation.topic()));
 container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
 container.setRocketMQMessageListener(annotation);
 container.setRocketMQListener((RocketMQListener) bean);
 container.setObjectMapper(objectMapper);
 container.setName(name); // REVIEW ME, use the same clientId or multiple?
?
 return container;
 }
?
 private void validate(RocketMQMessageListener annotation) {
 if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
 annotation.messageModel() == MessageModel.BROADCASTING) {
 throw new BeanDefinitionValidationException(
 "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
 }
 }
}
  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

RocketMQMessageListener

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
?
 String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
 String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
 String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
 String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
 String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
?
 /**
 * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
 * load balance. It's required and needs to be globally unique.
 *
 *
 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
 */
 String consumerGroup();
?
 /**
 * Topic name.
 */
 String topic();
?
 /**
 * Control how to selector message.
 *
 * @see SelectorType
 */
 SelectorType selectorType() default SelectorType.TAG;
?
 /**
 * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
 */
 String selectorExpression() default "*";
?
 /**
 * Control consume mode, you can choice receive message concurrently or orderly.
 */
 ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
?
 /**
 * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
 */
 MessageModel messageModel() default MessageModel.CLUSTERING;
?
 /**
 * Max consumer thread number.
 */
 int consumeThreadMax() default 64;
?
 /**
 * Max consumer timeout, default 30s.
 */
 long consumeTimeout() default 30000L;
?
 /**
 * The property of "access-key".
 */
 String accessKey() default ACCESS_KEY_PLACEHOLDER;
?
 /**
 * The property of "secret-key".
 */
 String secretKey() default SECRET_KEY_PLACEHOLDER;
?
 /**
 * Switch flag instance for message trace.
 */
 boolean enableMsgTrace() default true;
?
 /**
 * The name value of message trace topic.If you don't config,you can use the default trace topic name.
 */
 String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
?
 /**
 * The property of "name-server".
 */
 String nameServer() default NAME_SERVER_PLACEHOLDER;
?
 /**
 * The property of "access-channel".
 */
 String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
  • RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel属性

小结

  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

doc

  • ListenerContainerConfiguration

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表