网站首页 > 博客文章 正文
本文以 RocketMQ 为例,介绍了长连接和长轮询的基础概念以及代码示例,并结合 RocketMQ 代码讲解了这两项技术的实际运用,以及 RocketMQ 采用这两个技术的背景。
长连接
什么是长连接
TCP 本身没有长连接短连接的概念,一个连接没有被关闭,就算是「长连接」,如果接收完请求响应就关闭,则为「短连接」
下面是一个基于 ServerSocket 的网络程序,程序监听 8081 端口,将输入倒转后输出到客户端。
public static void main(String[] args) throws Exception {
try (ServerSocket serverSocket = new ServerSocket(8081)) {
Socket socket = serverSocket.accept();
socket.setKeepAlive(true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
while (true) {
String s = in.readLine();
System.out.println("received:" + s + " at:" + System.currentTimeMillis());
out.println(StringUtils.reverse(s));
}
}
}
执行程序,然后通过 telnet 127.0.0.1 8081 与应用程序连接,通过 Wireshark 抓包结果如下。
Wireshark 抓包结果
可以看到除了前三个是用来创建连接的,其余都是每次请求和响应的数据包。
长连接在 RocketMQ 中的应用
长连接主要应用在同 name server 的通信上,包括 producer 向 name server 请求路由信息、broker 向 name server 同步心跳、producer/consumer 向 broker 请求等。
心跳机制
在长连接的保持过程中,服务端需要发现并清理无用客户端连接,在 TCP 中,通过 keepalive 机制实现。
在 Linux 中,默认每隔 7200s 会由服务端向客户端发送探活包,如果收到 RST 回包,则认为客户端已经断开连接,服务端也会将此连接断开,以节省资源。
我们可以通过 netstat -altpno 命令打印距下次发送探活包的剩余时间。
keepalive 机制探活时间
TCP 的 keepalive 机制并不适用于 RocketMQ,原因如下:
- 探活时间过长。虽然可以修改,但这是操作系统层面的修改,影响全局。
- 网络通不代表客户端存活。当客户端因 GC 频繁、CPU 负载高等原因已经无法对外提供访问了,但此时网络仍然是通的。
在 RocketMQ 中,更多的是通过应用层的心跳来实现连接保持的功能。
拿 broker 向 name server 发送心跳包来说,broker 每隔 30s 请求一次 name server,name server 也会定期清理长时间(默认 120s)不发送请求的 broker。
下面是 RocketMQ 启动网络的 Netty 代码,可以看到直接禁用了 TCP 的 keepalive 机制,并且利用 IdleStateHandler 做心跳检测的补充。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
// 禁用 TCP 的 keepalive 机制。
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
// 启用心跳检测 Handler。
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
长连接优势
长连接适用于点对点的,操作频繁的场景。
对于 RocketMQ 来说再合适不过了,RocketMQ 中各组件相对稳定,路由关系比较确定。
RocketMQ 需要频繁地发送和接收消息,各个组件之间通过心跳沟通。符合操作频繁场景。
长轮询
什么是长轮询
轮询(Polling)简单来说就是定时地查询某个外部系统的状态,一般等到变更到特定状态时,轮询结束。
// client code
while (true) {
String status = sendRequest(param);
if (Objects.equals(status, "ok") {
break;
} else {
TimeUnit.SECONDS.sleep(1);
}
}
Long polling is itself not a true push; long polling is a variation of the traditional polling technique, but it allows emulating a push mechanism under circumstances where a real push is not possible. — wiki
长轮询(Long polling)是轮询的一种变体,通过服务端的操作,将 pull 操作模拟成 push 操作。
// server code
long deadline = pollTime + System.currentTimeMillis();
while (true) {
if (System.currentTimeMillis() < deadline) {
Result result = getResult(param);
if (result != null) {
return result;
} else {
TimeUnit.SECONDS.sleep(1);
}
} else {
return null;
}
}
长轮询在 RocketMQ 中的应用
长轮询主要用于 consumer 和 broker 的连接上。
RocketMQ 支持推拉两种消费模式,其中推模式是建立在拉模式上的。
在拉模式下,consumer 向 broker 发起拉取数据请求,携带允许暂停时间(默认 30s)请求。broker 通过 PullMessageProcessor#processRequest 处理请求,通过 consume queue 查找 commitlog 数据,如果没有找到且允许长轮询,则将请求放入 PullRequestHoldService ,以 topic 和 queue 为区分值,放入到 List 里,默认每隔 5s 再次处理请求,如果获取到数据,返回。否则再次进入 List,直到获取到数据或者超过最大允许暂停时间。
RocketMQ 长轮询流程
长轮询优势
长轮询机制可以减少 consumer 和 broker 的连接请求数,减少消息时延,提升消息实时率。
总结
- RocketMQ 中默认所有请求都是长连接的,除非手动关闭连接。
- RocketMQ 中长连接采用应用间心跳方式续租。
- 长轮询模式支撑了推拉模式,减少消息时延。
猜你喜欢
- 2024-09-15 python轻松抓取app接口(python抓app数据)
- 2024-09-15 spring boot 中设置 端口的 两种方式
- 2024-09-15 教你用 Python 操控你的上网请求(如何用python语言进行网络爬虫的开发)
- 2024-09-15 Node.js-第一个应用程序(nodejs应用开发)
- 2024-09-15 Linux(CentOS 7)下安装配置nginx代理多个tomcat实例和应用
- 2024-09-15 Iptables 最佳实践(iptables-j)
- 2024-09-15 nginx应用 | 反向代理,统一鉴权,目录重定向
- 2024-09-15 Docker-08-企业级私有仓库(k8s docker私有仓库搭建)
- 2024-09-15 Spring Boot中更改默认端口的方法
- 2024-09-15 Nginx基于TCP/UDP端口的四层负载均衡(stream模块)配置梳理
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)