网站首页 > 博客文章 正文
在本章中,我们将分析 RocketMQ 是如何 创建订阅组的。看看 RocketMQ 是如何实现的。
如何开启&关闭
在部署 RocketMQ 的 Broker 的时候,我们通常都会把 autoCreateSubscriptionGroup 设置为 false。一方面防止胡乱订阅,一方面也是为了后面的运维、统计。
命令参数
用法:sh mqadmin updateSubGroup -n 192.168.1.100:9876 -t shg
指令:updateSubGroup
代码入口:org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand
参数 | 是否必填 | 说明 |
-h | 否 | 打印帮助 |
-n | 是 | nameserve 服务地址列表,格式ip:port;ip:port;… |
-b | 否 | 如果 -c 为 空,则必填 broker 地址,表示订阅组建立在该 broker 上 |
-c | 否 | 如果 -b 为空,则必填 cluster 名称,表示 topic 建在该集群上。(集群可通过 clusterList 命令来查询) |
-d | 否 | 是否容许广播方式消费 |
-g | 是 | 订阅组名称 |
-i | 否 | 从哪个broker 开始消费 |
-m | 否 | 是否容许从队列的最小位置开始消费,默认会设置为 false。 |
-q | 否 | 消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列 |
-r | 否 | 重试消费最大次数,超过则投递到死信队列,不再投递,并报警 |
-s | 否 | 消费功能是否开启 |
-w | 否 | 发现消息堆积后,将Consumer的消费请求重定向到另外一台Slave机器 |
-q | 否 | 重试队列的数量。默认是 1个队列 |
-r | 否 | 最大重试次数 |
-a | 否 | 是否通知有消费者实例变化 |
解析命令行参数入口
// RocketMQ 配置了 命令行的执行 shell 脚本入口。就是下面的 mqadmin.sh 这个文件
mqadmin.sh
// 解析命令行入口
org.apache.rocketmq.tools.command.MQAdminStartup#main0
// 设置 namesrvAddr 为全局变量。
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
RequstCode
// 更新或者创建订阅住
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
// Namesrv 获取注册到Name Server的所有Broker集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106;
核心代码流程
只讨论集群下的。指定 broker 跟这个差不多
// 从 Name Server 获取 此集群下的 master 节点
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
// broker 端缓存配置
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
// 需要立马持久化,防止断电等意外情况发生
this.persist();
核心数据结构
// 订阅组
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
// 集群信息
public class ClusterInfo extends RemotingSerializable {
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
}
// RocketMQ 的broker中的主从关系是通过 brokerName 来绑定的。
// broker 信息
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
// broker 名字
private String brokerName;
/**
* brokerId 为 0,表示该 broker 为 master
* broker address 这里究竟是什么?是:mq1101.jiandan.com:10911
*/
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
}
哪里使用到订阅关系了呢
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
// 确保订阅组存在
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
总结
- RocketMQ 的订阅关系就是存在 broker 并作持久化,等 Consumer 端消费消息的时候,校验一下。
- 好几篇 RocketMQ 的源码,我都在一直强调 数据结构,这个在 RocketMQ 很重要,把这些数据结构记住了,能方便我们理解 RocketMQ 更快。
- RocketMQ 通过从 运维端来阅读代码。会更加简单。也让大家了解到 RocketMQ 是多么的简单的。
关于我
前 去哪儿网 技术专家!混迹中间件职场8+年!分享各种Java中间件知识!
- 上一篇: RocketMQ - 如何实现顺序消息
- 下一篇: RocketMQ中的线程池是如何创建的?
猜你喜欢
- 2025-01-12 一次 RocketMQ 顺序消费延迟的问题定位
- 2025-01-12 线上问题之:RocketMq重复消费
- 2025-01-12 基于 RocketMQ 的分布式事务解决方案
- 2025-01-12 生产环境出现网络分区,RocketMQ集群表示毫无压力
- 2025-01-12 实战!如何从零搭建10万级 QPS 大流量、高并发优惠券系统
- 2025-01-12 RocketMQ的高性能网络通信机制竟然是如此实现?
- 2025-01-12 10 张图告诉你 RocketMQ 是怎样保存消息的
- 2025-01-12 RocketMQ系列(六)批量发送与过滤
- 2025-01-12 Canal集群快速搭建
- 2025-01-12 从放弃到入门:消息队列之RocketMQ
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)