专业的编程技术博客社区

网站首页 > 博客文章 正文

rocketmq延迟消息实现原理(上)

baijin 2024-12-12 11:01:19 博客文章 7 ℃ 0 评论

什么是延迟消息

延迟消息是指,生产者发送一条消息到到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间(消息延迟一定时间),可以认为定时到当前时间加上一定的延迟时间。消费者才可以消费到这条消息。

这篇文章为什么说是实现原理(上)呢,因为rocketmq4版本针对延迟消息只支持默认等级的,不支持精确任意秒级的延迟或定时消息,在rocketmq5版本中支持定时到任意秒级时间的定时消息,所以这篇文章叫上,下步再继续讲解支持定时任意秒级时间的原理。

应用场景

在电子商务中,如果提交订单,可以发送延迟消息,30分钟后可以查看订单状态。如果订单仍未付款,则可以取消订单并释放库存。

发送延迟消息

发送延迟的消息代码示例,如下代码块所示:

// 实例化一个生产者来产生消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
 // 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
Message message = new Message("TestTopic", ("Hello scheduled message ").getBytes());
 // 设置延时等级
message.setDelayTimeLevel(5);
 // 发送消息 
 producer.send(message);  
 // 关闭生产者
 producer.shutdown();

可以看到相对于发送普通消息,增加了设置延迟等级这个属性,这个延迟等级是个int类型的属性,在发送的消息中是放到消息的扩展属性中的,key:DELAY,value:xx(等级值,如上面的5)

 // Message的扩展属性
private Map<String, String> properties;


 public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
 }

rocketmq目前支持的延迟等级是固定的,写在消息存储配置类(org.apache.rocketmq.store.config.MessageStoreConfig)中的一个属性,如下代码所示,可以看到支持18个等级的延迟设置(从1s~2h)。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

上面发送代码中设置的等级5对应的就是延迟1m的设置,具体等级的设置与延迟的时间对应关系,是在rocketmq启动的时候就会加载解析上面的配置信息,进行延迟等级与时间的映射,具体有两块地方用到这个配置,分别是消息存储与定时调度。

延迟消息存储

解析延迟时间代码是org.apache.rocketmq.store.DefaultMessageStore中,核心代码如下:

// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<>(32);
 // 解析上面的延迟时间,放入到上面的delayLevelTable中
public boolean parseDelayLevel() {
        HashMap<String, Long> timeUnitTable = new HashMap<>();
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);
        String levelString = messageStoreConfig.getMessageDelayLevel();
        try {
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                Long tu = timeUnitTable.get(ch);
                int level = i + 1;
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                long delayTimeMillis = tu * num;
                this.delayLevelTable.put(level, delayTimeMillis);
            }
        } catch (Exception e) {
            LOGGER.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
            return false;
        }
        return true;
    }

消息发送到broker后,消息进行存储org.apache.rocketmq.store.DefaultMessageStore#putMessage在消息存储方法中,默认注册了3个hook前置处理,针对延迟消息处理,我们主要看下面这个hook方法的处理,代码如下

public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
        final MessageExtBrokerInner msg) {
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
             // 其他......
            // 处理延迟消息逻辑
            if (msg.getDelayTimeLevel() > 0) {
                transformDelayLevelMessage(brokerController, msg);
            }
        }
        return null;
    }
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
        // 超过最大的延迟等级,设置为最大的等级,即延迟2h
        if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
        }
        // 把真实的topic与队列id放到扩展属性中,进行记录
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        // 设置延迟消息topic为 SCHEDULE_TOPIC_XXXX
        msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
        // 设置延迟消息队列id为延迟等级-1
        msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}

上面的代码执行,总结如下:

  1. 如果设置的延迟等级大于最大的延迟等级,设置为最大的;
  2. 把延迟消息的真实topic与queueId放到消息的扩展属性中;
  3. 把延迟消息的topic用默认的SCHEDULE_TOPIC_XXXX进行替换
  4. queueId设置为延迟级别-1;
  5. 写入到commitlog日志中,及异步写入到对应的18个consumequeue(这里的消息队列的topic是默认的SCHEDULE_TOPIC_XXXX,tagscode是通过存储的时间进行计算),后续由下面的定时任务进行消息这些消息队列。经典的消息存储结构如下所示。

  6. 注意:这个时候由于存储的消息及消费队列并不是真实的topic及queueId,所以此时是无法被消息者所消费的,只能通过定时调度进行消费。

定时调度

定时消息服务类org.apache.rocketmq.broker.schedule.ScheduleMessageService,启动方法中,加载配置解析代码如下:

 // 存储延迟级别与时间的映射
 private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<>(32);
 /**
   * 启动方法
   */    
 public void start() {
         // cas操作,进行启动后标记
        if (started.compareAndSet(false, true)) {
            // 调用加载防范
            this.load();
            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
            if (this.enableAsyncDeliver) {
                this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
            }
            // 遍历延迟等级与时间的映射关系
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                if (timeDelay != null) {
                    if (this.enableAsyncDeliver) {
                        this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                    }
                    // 为每个延迟时间创建一个的定时任务(DeliverDelayedMessageTimerTask是个线程)
                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
            }
             // 持久化操作 
            scheduledPersistService.scheduleAtFixedRate(() -> {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
        }
}
    
/**
  * 加载方法 
  */
 @Override
  public boolean load() {
     boolean result = super.load();
     result = result && this.parseDelayLevel();
     result = result && this.correctDelayOffset();
     return result;
}


/**
  * 解析配置的延迟时间与等级进行映射
 */
public boolean parseDelayLevel() {
        HashMap<String, Long> timeUnitTable = new HashMap<>();
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);


        String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
        try {
            String[] levelArray = levelString.split(" ");
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                String ch = value.substring(value.length() - 1);
                Long tu = timeUnitTable.get(ch);
                int level = i + 1;
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                long delayTimeMillis = tu * num;
                this.delayLevelTable.put(level, delayTimeMillis);
                if (this.enableAsyncDeliver) {
                    this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
                }
            }
        } catch (Exception e) {
            log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
            return false;
        }
        return true;
    }

核心类org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask

class DeliverDelayedMessageTimerTask implements Runnable {
        // 延迟级别
        private final int delayLevel;
        // 偏移量
        private final long offset;


        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }


        @Override
        public void run() {
            try {
                if (isStarted()) {
                    // 核心执行逻辑
                    this.executeOnTimeUp();
                }
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeUp exception", e);
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
            }
        }

这里我们主要看执行方法executeOnTimeUp,该方法执行逻辑步骤总结:

  1. 根据默认的延迟消息topic与延迟队列id获取消息消费队列,如果未找到,说明并没有当前延迟的消息,则根据延时级别创建下一次调度任务;
  2. 根据offset从消息消费队列中获取当前队列中所有有效的消息,如果未找到,则更新一下延迟队列定时拉取进度并创建定时任务待下一次继续尝试。
  3. 遍历ConsumeQueue,每一个标准ConsumeQueue条目为20个字节,解析出消息的物理偏移量、消息长度、消息taggcode,为从commitlog加载具体的消息做准备。
  4. 根据消息物理偏移量与消息大小从commitlog文件中查找消息。如果未找到消息,打印错误日志,根据延迟时间创建下一个定时器。
  5. 根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并从消息属性中恢复消息真实的消息主题与消息消费队列,消息的消费次数reconsumeTimes并不会丢失。
  6. 将消息再次存入到commitlog中,并异步转发到对应的消息队列上,此时消费者可以进行消息消费。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表