一、概述
Windows如何安装部署rocketMq和rocketMq-console
springboot集成rocketmq,发送消息和监听消息案例
rocketmq-spring-boot-starter源码分析
springboot集成rocketmq如何往多个集群发送消息
二、解析
2.1、name-server架构
1、name-server集群之间互不通信,其中一台name-server宕机不影响其他name-server
2、broker会向所有name-server注册,每隔30秒向所有的name-server发送心跳,心跳中包含了自身的topic路由信息
3、producer会随机选择一台name-server,并与之建立长连接,每隔30秒从name-server获取topic的最新队列信息【所以会有30秒的延迟】,得到broker的ip和端口后,连接broker(也是长连接)
4、consumer会随机选择一台name-server,并与之建立长连接,每隔30秒从name-server获取topic的最新队列信息【所以会有30秒的延迟】,得到broker的ip和端口后,连接broker(也是长连接)
5、name-server每隔10秒,会自动扫描所有存活的broker连接,如果连接的最后更新时间与当前时间差值超过120秒,则会将之剔除,但这一过程不会通知生产者和消费者
2.2、name-server作用
保存所有活跃的 broker 列表,包括 Master 和 Slave
保存所有 topic 和该 topic 所有队列的列表
为生产者和消费者提供路由信息,可以看作是整个mq集群的协调者
2.3、name-server源码
这里使用的是:4.3.0 版本:https://gitee.com/apache/rocketmq
NamesrvStartup类:
public static void main(String[] args) {
main0(args);
controllerManagerMain();
}
public static NamesrvController main0(String[] args) {
try {
// 解析配置文件,和命令行参数(-c和-p等)
parseCommandlineAndConfigFile(args);
// 创建NamesrvController,并启动
NamesrvController controller = createAndStartNamesrvController();
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static NamesrvController createAndStartNamesrvController() throws Exception {
// 创建name-server,主要是填充namesrvConfig、nettyServerConfig、nettyClientConfig等参数
NamesrvController controller = createNamesrvController();
// 启动NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
// 启动成功后会输出成功日志信息
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 启动NamesrvController时,调用NamesrvController的初始化方法
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
controller.shutdown();
return null;
}));
// 启动NamesrvController
controller.start();
return controller;
}
// NamesrvController初始化
public boolean initialize() {
// 加载KVConfigManager配置
loadConfig();
// 初始化NettyServer网络组件实例
initiateNetworkComponents();
// 初始化线程池
initiateThreadExecutors();
// 向RemotingServer注册ClientRequestProcessor客户端请求处理器
registerProcessor();
// 开启几个定时任务
// 每隔10秒扫描一次broker,移除不活跃的broker【scanNotActiveBroker】
// 每隔10min打印一次KV配置
startScheduleService();
// 初始化SsL上下文
initiateSslContext();
// 初始化ZoneRouteRPCHook,用于区域路由
initiateRpcHooks();
return true;
}
public void start() throws Exception {
// 启动NettyRemotingServer服务端
this.remotingServer.start();
// In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
if (0 == nettyServerConfig.getListenPort()) {
nettyServerConfig.setListenPort(this.remotingServer.localListenPort());
}
// 更新name-server地址列表
this.remotingClient.updateNameServerAddressList(Collections.singletonList(RemotingUtil.getLocalAddress()
+ ":" + nettyServerConfig.getListenPort()));
// 启动NettyRemotingClient客户端
this.remotingClient.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
this.routeInfoManager.start();
}
本文暂时没有评论,来添加一个吧(●'◡'●)