专业的编程技术博客社区

网站首页 > 博客文章 正文

RocketMQ源码分析之消息自定义匹配过滤及过滤服务器管理组件分析

baijin 2024-08-12 13:27:34 博客文章 19 ℃ 0 评论

#头条创作挑战赛#

一、前言

上一篇我们分析了过滤器ExpressionMessageFilter,以及其中对标签匹配和SQL 匹配的过滤原理进行了分析,这一篇我就来分析一下自定义匹配;

二、RocketMQ三种消息过滤类型

RocketMQ 客户端在消费消息的时候有三种消息过滤类型:

  1. 标签匹配;
  2. SQL 匹配;
  3. 自定义匹配;

1、标签匹配

consumer.subscribe("TopicTest", "TagA | TagB | TagC");

2、SQL 匹配

consumer.subscribe("TopicTest",
                MessageSelector.bySql(
                    "(TAGS is not null and TAGS in ('TagA', 'TagB'))" + 
                "and (a is not null and a between 0  3)"));

3、自定义匹配

客户端实现 MessageFilter 类,自定义过滤逻辑:

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());

String filterCode = MixAll.file2String(classFile);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",filterCode);

对于 MessageFilter 类实现 match 方法即可:

public class MessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg, FilterContext context) {
        String property = msg.getProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if (((id % 10) == 0) &&
                (id > 100)) {
                return true;
            }
        }

        return false;
    }
    
}

三、过滤服务器

  1. 构造方法;
  2. 创建指定数量过滤服务器;
  3. 注册过滤服务器;
  4. 扫描失活过滤服务器;
  5. 连接异常事件处理;

整个流程如下图所示:

1、构造方法

Broker 服务器在收到来自过滤服务器的注册信息之后,会把过滤服务器的地址信息、注册时间等放到过滤服务器表中;

public class FilterServerManager {

    public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; // 30s
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

    // 过滤服务器数据表,网络连接->过滤服务器的映射关系
    private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
            new ConcurrentHashMap<Channel, FilterServerInfo>(16);
    private final BrokerController brokerController;
    // 调度线程池组件
    private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));

    public FilterServerManager(final BrokerController brokerController) {
        this.brokerController = brokerController;
    }
}
static class FilterServerInfo {
    // 过滤服务器的地址信息
    private String filterServerAddr;
    // 注册时间
    private long lastUpdateTimestamp;
}

2、创建指定数量过滤服务器

在启动 Broker 服务器的时候,如果指定了下面一行设置:

brokerConfig.setFilterServerNums(int filterServerNums);

即将过滤服务器的数量设定为大于 0,那么 Broker 服务器在启动的时候,将会启动 filterServerNums过滤服务器。过滤服务器是通过调用 shell 命令的方式,启用独立进程进行启动的。

public void start() {
    // 启动一个定时调度线程,每隔30s跑一次,创建过滤服务器
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                FilterServerManager.this.createFilterServer();
            } catch (Exception e) {
                log.error("", e);
            }
        }
    }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
}
public void createFilterServer() {
    // 根据配置找到过滤服务器数量,减去实际过滤服务器数量,差值就是要创建的数量
    int more = this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
    String cmd = this.buildStartCommand(); // 获取一个filter server进程的启动命令
    for (int i = 0; i < more; i++) {
        FilterServerUtil.callShell(cmd, log); // 调用shell命令,拉起来一个一个的filter server
    }
}
private String buildStartCommand() {
    String config = "";
    if (BrokerStartup.configFile != null) {
        config = String.format("-c %s", BrokerStartup.configFile);
    }

    if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
        config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
    }

    if (RemotingUtil.isWindowsPlatform()) {
        return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
            this.brokerController.getBrokerConfig().getRocketmqHome(),
            config);
    } else {
        return String.format("sh %s/bin/startfsrv.sh %s",
            this.brokerController.getBrokerConfig().getRocketmqHome(),
            config);
    }
}

3、注册过滤服务器

// 当filter server进程启动之后,他会主动来跟我的broker建立网络连接以及发起注册
public void registerFilterServer(final Channel channel, final String filterServerAddr) {
    // 根据channel从缓存中查询出过滤的server
    FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
    if (filterServerInfo != null) {
        // 更新最近一次更新时间
        filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
    } else {
        // 构建FilterServerInfo放入filterServerTable
        filterServerInfo = new FilterServerInfo();
        filterServerInfo.setFilterServerAddr(filterServerAddr);
        filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
        this.filterServerTable.put(channel, filterServerInfo);
        log.info("Receive a New Filter Server<{}>", filterServerAddr);
    }
}

4、扫描失活过滤服务器

// filter server失活扫描
public void scanNotActiveChannel() {

    Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Channel, FilterServerInfo> next = it.next();
        long timestamp = next.getValue().getLastUpdateTimestamp();
        Channel channel = next.getKey();
        if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
            log.info("The Filter Server<{}> expired, remove it", next.getKey());
            it.remove();
            RemotingUtil.closeChannel(channel);
        }
    }
}

5、连接异常事件处理

// 发生了连接异常事件,就在这里进行处理
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
    FilterServerInfo old = this.filterServerTable.remove(channel);
    if (old != null) {
        log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
            remoteAddr);
    }
}

四、过滤类

当消费者通过使用自定义匹配过滤消息的时候,这个时候会将存储订阅信息的 SubscriptionData 中的 filterClassSource 设置为 true,以表征这个客户端需要过滤类来进行消息的匹配和过滤。

消费者客户端在启动过程中,还会定时地上传本地的过滤类源码到过滤服务器:

public class MQClientInstance {

    private void startScheduledTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    }

    public void sendHeartbeatToAllBrokerWithLock() {
        // ...
        this.uploadFilterClassSource();
    }
    
}

其中过滤服务器的地址列表是在从 Namesrv 服务器获取话题路由信息的时候取得的,话题路由信息不光存储了消息队列数据,还存储了各个 Broker 所关联的过滤服务器列表:

public class TopicRouteData extends RemotingSerializable {
    // ...
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

当过滤服务器接收到来自消费者客户端的源码之后,其会首先首先生成一个键为 话题@组 的字符串来查阅过滤类信息是否已经存在于内存里面的 filterClassTable 表中且文件通过 CRC 校验。如果没有存在或校验失败,那么就需要先编译并加载这个类:

public class DynaCode {

    public void compileAndLoadClass() throws Exception {
        String[] sourceFiles = this.uploadSrcFile();
        this.compile(sourceFiles);
        this.loadClass(this.loadClass.keySet());
    }
    
}

默认情况下,编译后的类存放于 $HOME/rocketmq_filter_class/$PID 目录下,类的源文件和类的字节码文件名也会相应地加上当前时间戳来确定:

上述流程图如下:

五、过滤消息

当消费者客户端启用自定义匹配过滤消息后,发往服务器的数据中也包含了过滤标志位,这样每次拉取消息的服务器也由原来的 Broker 服务器变更为 Filtersrv 过滤服务器,其中过滤服务器地址的选择是随机确定的:

public class PullAPIWrapper {

    public PullResult pullKernelImpl(final MessageQueue mq, /** 其它参数 **/) throws Exception {
        // ...
        if (findBrokerResult != null) {

            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                // 从过滤服务器拉取消息
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            // ...
        }
    }
    
}

过滤服务器在启动的时候,内部还启动了一个 PullConsumer 客户端,用于从 Broker 服务器拉取消息:

public class FiltersrvController {

    private final DefaultMQPullConsumer defaultMQPullConsumer =
        new DefaultMQPullConsumer(MixAll.FILTERSRV_CONSUMER_GROUP);

    public void start() throws Exception {
        this.defaultMQPullConsumer.start();
        // ...
    }
    
}

当过滤服务器收到真正的消费者发来的消费消息的请求之后,其会委托内部的 PullConsumer 使用包含在请求体内的偏移量去 Broker 服务器拉取所有消息,此时这些消息是完全没有过滤掉的:

public class DefaultRequestProcessor implements NettyRequestProcessor {

    private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
                                               final RemotingCommand request) throws Exception {

        MessageQueue mq = new MessageQueue();
        
        mq.setTopic(requestHeader.getTopic());
        mq.setQueueId(requestHeader.getQueueId());
        mq.setBrokerName(this.filtersrvController.getBrokerName());

        // 设置偏移量和最大数量
        long offset = requestHeader.getQueueOffset();
        int maxNums = requestHeader.getMaxMsgNums();

        // 委托内部消费者从 Broker 服务器拉取消息
        pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
        
    }
    
}

过滤服务器从 Broker 服务器获取到完整的消息列表之后,会遍历消息列表,然后使用过滤类一一进行匹配,最终将匹配成功的消息列表返回给客户端:

public class DefaultRequestProcessor implements NettyRequestProcessor {

    private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
                                               final RemotingCommand request) throws Exception {
        final PullCallback pullCallback = new PullCallback() {

                @Override
                public void onSuccess(PullResult pullResult) {
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> msgListOK = new ArrayList<MessageExt>();
                        for (MessageExt msg : pullResult.getMsgFoundList()) {
                            // 使用过滤类过滤消息
                            boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
                            if (match) {
                                msgListOK.add(msg);
                            }
                        }
                        break;
                        // ...
                    }

                }

            };

        // ...
    }
    
}

上述流程如下图所示:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表