一、前言
上一篇我们分析了过滤器ExpressionMessageFilter,以及其中对标签匹配和SQL 匹配的过滤原理进行了分析,这一篇我就来分析一下自定义匹配;
二、RocketMQ三种消息过滤类型
RocketMQ 客户端在消费消息的时候有三种消息过滤类型:
- 标签匹配;
- SQL 匹配;
- 自定义匹配;
1、标签匹配
consumer.subscribe("TopicTest", "TagA | TagB | TagC");
2、SQL 匹配
consumer.subscribe("TopicTest",
MessageSelector.bySql(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 3)"));
3、自定义匹配
客户端实现 MessageFilter 类,自定义过滤逻辑:
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
String filterCode = MixAll.file2String(classFile);
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",filterCode);
对于 MessageFilter 类实现 match 方法即可:
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 10) == 0) &&
(id > 100)) {
return true;
}
}
return false;
}
}
三、过滤服务器
- 构造方法;
- 创建指定数量过滤服务器;
- 注册过滤服务器;
- 扫描失活过滤服务器;
- 连接异常事件处理;
整个流程如下图所示:
1、构造方法
Broker 服务器在收到来自过滤服务器的注册信息之后,会把过滤服务器的地址信息、注册时间等放到过滤服务器表中;
public class FilterServerManager {
public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; // 30s
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
// 过滤服务器数据表,网络连接->过滤服务器的映射关系
private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
new ConcurrentHashMap<Channel, FilterServerInfo>(16);
private final BrokerController brokerController;
// 调度线程池组件
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
public FilterServerManager(final BrokerController brokerController) {
this.brokerController = brokerController;
}
}
static class FilterServerInfo {
// 过滤服务器的地址信息
private String filterServerAddr;
// 注册时间
private long lastUpdateTimestamp;
}
2、创建指定数量过滤服务器
在启动 Broker 服务器的时候,如果指定了下面一行设置:
brokerConfig.setFilterServerNums(int filterServerNums);
即将过滤服务器的数量设定为大于 0,那么 Broker 服务器在启动的时候,将会启动 filterServerNums 个过滤服务器。过滤服务器是通过调用 shell 命令的方式,启用独立进程进行启动的。
public void start() {
// 启动一个定时调度线程,每隔30s跑一次,创建过滤服务器
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
FilterServerManager.this.createFilterServer();
} catch (Exception e) {
log.error("", e);
}
}
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
}
public void createFilterServer() {
// 根据配置找到过滤服务器数量,减去实际过滤服务器数量,差值就是要创建的数量
int more = this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
String cmd = this.buildStartCommand(); // 获取一个filter server进程的启动命令
for (int i = 0; i < more; i++) {
FilterServerUtil.callShell(cmd, log); // 调用shell命令,拉起来一个一个的filter server
}
}
private String buildStartCommand() {
String config = "";
if (BrokerStartup.configFile != null) {
config = String.format("-c %s", BrokerStartup.configFile);
}
if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
}
if (RemotingUtil.isWindowsPlatform()) {
return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
this.brokerController.getBrokerConfig().getRocketmqHome(),
config);
} else {
return String.format("sh %s/bin/startfsrv.sh %s",
this.brokerController.getBrokerConfig().getRocketmqHome(),
config);
}
}
3、注册过滤服务器
// 当filter server进程启动之后,他会主动来跟我的broker建立网络连接以及发起注册
public void registerFilterServer(final Channel channel, final String filterServerAddr) {
// 根据channel从缓存中查询出过滤的server
FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
if (filterServerInfo != null) {
// 更新最近一次更新时间
filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
} else {
// 构建FilterServerInfo放入filterServerTable
filterServerInfo = new FilterServerInfo();
filterServerInfo.setFilterServerAddr(filterServerAddr);
filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
this.filterServerTable.put(channel, filterServerInfo);
log.info("Receive a New Filter Server<{}>", filterServerAddr);
}
}
4、扫描失活过滤服务器
// filter server失活扫描
public void scanNotActiveChannel() {
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, FilterServerInfo> next = it.next();
long timestamp = next.getValue().getLastUpdateTimestamp();
Channel channel = next.getKey();
if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
log.info("The Filter Server<{}> expired, remove it", next.getKey());
it.remove();
RemotingUtil.closeChannel(channel);
}
}
}
5、连接异常事件处理
// 发生了连接异常事件,就在这里进行处理
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
FilterServerInfo old = this.filterServerTable.remove(channel);
if (old != null) {
log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
remoteAddr);
}
}
四、过滤类
当消费者通过使用自定义匹配过滤消息的时候,这个时候会将存储订阅信息的 SubscriptionData 中的 filterClassSource 设置为 true,以表征这个客户端需要过滤类来进行消息的匹配和过滤。
消费者客户端在启动过程中,还会定时地上传本地的过滤类源码到过滤服务器:
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}
public void sendHeartbeatToAllBrokerWithLock() {
// ...
this.uploadFilterClassSource();
}
}
其中过滤服务器的地址列表是在从 Namesrv 服务器获取话题路由信息的时候取得的,话题路由信息不光存储了消息队列数据,还存储了各个 Broker 所关联的过滤服务器列表:
public class TopicRouteData extends RemotingSerializable {
// ...
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
当过滤服务器接收到来自消费者客户端的源码之后,其会首先首先生成一个键为 话题@组 的字符串来查阅过滤类信息是否已经存在于内存里面的 filterClassTable 表中且文件通过 CRC 校验。如果没有存在或校验失败,那么就需要先编译并加载这个类:
public class DynaCode {
public void compileAndLoadClass() throws Exception {
String[] sourceFiles = this.uploadSrcFile();
this.compile(sourceFiles);
this.loadClass(this.loadClass.keySet());
}
}
默认情况下,编译后的类存放于 $HOME/rocketmq_filter_class/$PID 目录下,类的源文件和类的字节码文件名也会相应地加上当前时间戳来确定:
上述流程图如下:
五、过滤消息
当消费者客户端启用自定义匹配过滤消息后,发往服务器的数据中也包含了过滤标志位,这样每次拉取消息的服务器也由原来的 Broker 服务器变更为 Filtersrv 过滤服务器,其中过滤服务器地址的选择是随机确定的:
public class PullAPIWrapper {
public PullResult pullKernelImpl(final MessageQueue mq, /** 其它参数 **/) throws Exception {
// ...
if (findBrokerResult != null) {
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
// 从过滤服务器拉取消息
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// ...
}
}
}
过滤服务器在启动的时候,内部还启动了一个 PullConsumer 客户端,用于从 Broker 服务器拉取消息:
public class FiltersrvController {
private final DefaultMQPullConsumer defaultMQPullConsumer =
new DefaultMQPullConsumer(MixAll.FILTERSRV_CONSUMER_GROUP);
public void start() throws Exception {
this.defaultMQPullConsumer.start();
// ...
}
}
当过滤服务器收到真正的消费者发来的消费消息的请求之后,其会委托内部的 PullConsumer 使用包含在请求体内的偏移量去 Broker 服务器拉取所有消息,此时这些消息是完全没有过滤掉的:
public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
final RemotingCommand request) throws Exception {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.filtersrvController.getBrokerName());
// 设置偏移量和最大数量
long offset = requestHeader.getQueueOffset();
int maxNums = requestHeader.getMaxMsgNums();
// 委托内部消费者从 Broker 服务器拉取消息
pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
}
}
过滤服务器从 Broker 服务器获取到完整的消息列表之后,会遍历消息列表,然后使用过滤类一一进行匹配,最终将匹配成功的消息列表返回给客户端:
public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx,
final RemotingCommand request) throws Exception {
final PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgListOK = new ArrayList<MessageExt>();
for (MessageExt msg : pullResult.getMsgFoundList()) {
// 使用过滤类过滤消息
boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
if (match) {
msgListOK.add(msg);
}
}
break;
// ...
}
}
};
// ...
}
}
上述流程如下图所示:
本文暂时没有评论,来添加一个吧(●'◡'●)