专业的编程技术博客社区

网站首页 > 博客文章 正文

从实战开始,深入了解Netty组件?ByteBuf原理?「Netty系列」

baijin 2024-10-03 17:38:51 博客文章 5 ℃ 0 评论

如果零基础学习Netty可以查看前面文章。上文对IO模型和Reactor模型进行讲解,是不是感觉有点懵懵的。哈哈哈,反正我并没有对其有深入见解。我是这样安慰自己的,知识在不断地反复学习和思考中有新的感悟。本篇文章想来从实战开始,带我深入了解Netty各个组件是做什么?ByteBuf执行原理又是怎样的?


?一 第一个Netty实例


用Netty实现通信。说白了就是客户端向服务端发消息,服务端接收消息并给客户端响应。所以我来看看服务端和客户端是如何实现的?


1.1 服务端


1. 依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.haopt.iot</groupId>
    <artifactId>first-netty</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.50.Final</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


2. 服务端-MyRPCServer


package com.haopt.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyRPCServer {
    public void start(int port) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // 工作线程,线程数默认是:cpu核数*2
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker) //设置线程组
                    .channel(NioServerSocketChannel.class)  //配置server通道
                    .childHandler(new MyChannelInitializer()); //worker线程的处理器
            //ByteBuf 的分配要设置为非池化,否则不能切换到堆缓冲区模式
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("服务器启动完成,端口为:" + port);
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}


3. 服务端-ChannelHandler


package com.haopt.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class MyChannelHandler extends ChannelInboundHandlerAdapter {
    /**
    * 获取客户端发来的数据
    * @param ctx
    * @param msg
    * @throws Exception
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("客户端发来数据:" + msgStr);
        //向客户端发送数据
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
    
    /**
    * 异常处理
    * @param ctx
    * @param cause
    * @throws Exception
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


4. 测试用例


package com.haopt.netty.myrpc;
import com.haopt.netty.server.MyRPCServer;
import org.junit.Test;
public class TestServer {
    @Test
    public void testServer() throws Exception{
        MyRPCServer myRPCServer = new MyRPCServer();
        myRPCServer.start(5566);
    }
}



1.2 客户端



1. 客户端-client


package com.haopt.netty.client;
import com.haopt.netty.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyRPCClient {
    public void start(String host, int port) throws Exception {
        //定义?作线程组
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //注意:client使?的是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
            .channel(NioSocketChannel.class) //注意:client使?的是NioSocketChannel
            .handler(new MyClientHandler());
            //连接到远程服务
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
             worker.shutdownGracefully();
        }
    }
}


2. 客户端-(ClientHandler)


package com.haopt.netty.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" +
        msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        String msg = "hello";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


相信代码执行起来没有任何问题(如果有任何问题反应交流)。但是对上面代码为何这样实现有很多疑。嘿嘿,我也是奥。接下来我们对这些代码中用到的组件进行介绍,希望能消除之前疑虑。如果还是不能,可以把疑问写于留言处,嘿嘿,我也不一定会有个好的解答,但是大佬总会有的。


二 Netty核心组件


我们都知道Netty是基于事件驱动。但是事件发生后,Netty的各个组件都做了什么?来看看下面内容!


2.1 Channel


1. 初识Channel

a 可以理解为socket连接,客户端和服务端连接的时候会创建一个channel。负责基本的IO操作,例如:bind()、connect()、read()、write()。

b Netty的Channel接口所提供的API,大大减少了Socket类复杂性


2. 常见Channel(不同的协议和阻塞类型的连接会有不同的Channel类型与之对应)


a NioSocketChannel,NIO的客户端 TCP Socket 连接。
b NioServerSocketChannel,NIO的服务器端 TCP Socket 连接。
c NioDatagramChannel, UDP 连接。
d NioSctpChannel,客户端 Sctp 连接。
e NioSctpServerChannel,Sctp 服务器端连接,这些通道涵盖了UDP和TCP?络IO以及?件IO。


2.2 EventLoopGroup、EventLoop


1. 概述

有了Channel连接服务,连接之间消息流动。服务器发出消息称为出站,服务器接受消息称为入站。那么消息出站和入站就产生了事件例如:连接已激活;数据读取;用户事件;异常事件;打开连接;关闭连接等等。有了事件,有了事件就需要机制来监控和协调事件,这个机制就是EventLoop。


2. 初识EventLoopGroup、EventLoop






对上图解释


a 一个EventLoopGroup包含一个或者多个EventLoop
b 一个EventLoop在生命周期内之和一个Thread绑定
c EventLoop上所有的IO事件在它专有的Thread上被处理。
d Channel在它生命周期只注册于一个Event Loop
e 一个Event Loop可能被分配给一个或者多个Channel


3. 代码实现


// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ?作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();


2.3 ChannelHandler


1. 初识ChannelHandler


对于数据的出站和入栈的业务逻辑都是在ChannelHandler中。


2. 对于出站和入站对应的ChannelHandler






ChannelInboundHandler ?站事件处理器
ChannelOutBoundHandler 出站事件处理器



3. 开发中常用的ChannelHandler(ChannelInboundHandlerAdapter、SimpleChannelInboundHandler)





SimpleChannelInboundHandler的源码(是ChannelInboundHandlerAdapter子类)




注意


两者的区别在于,前者不会释放消息数据的引?,?后者会释放消息数据的引?。


2.4 ChannelPipeline


1. 初识ChannelPipeline


将ChannelHandler串起来。一个Channel包含一个ChannelPipeline,而ChannelPipeline维护者一个ChannelHandler列表。
ChannelHandler与Channel和ChannelPipeline之间的映射关系,由ChannelHandlerContext进?维护。



ChannelHandler按照加?的顺序会组成?个双向链表,?站事件从链表的head往后传递到最后?个ChannelHandler。
出站事件从链表的tail向前传递,直到最后?个ChannelHandler,两种类型的ChannelHandler相互不会影响。


2.5 Bootstrap


1. 初识Bootstrap


是引导作用,配置整个netty程序,将各个组件串起来,最后绑定接口,启动服务。


2. Bootstrap两种类型(Bootstrap、ServerBootstrap)


客户端只需要一个EventLoopGroup,服务端需要两个EventLoopGroup。



上图解释


与ServerChannel相关联的EventLoopGroup 将分配?个负责为传?连接请求创建 Channel 的EventLoop。
?旦连接被接受,第?个 EventLoopGroup 就会给它的 Channel 分配?个 EventLoop。


2.6 Future


1. 初识


操作完成时通知应用程序的方式。这个对象可以看做异步操作执行结果占位符,它在将来某个时刻完成,并提供对其结果的访问。


2. ChannelFuture的由来


JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,只允许?动检查对应的操作是否已经完成,或者?直阻塞直到它完成。这是?常
繁琐的,所以 Netty 提供了它??的实现--ChannelFuture,?于在执?异步操作的时候使?。


3. Netty为什么完全是异步?


a ChannelFuture提供了?种额外的?法,这些?法使得我们能够注册?个或者多个  ChannelFutureListener实例。
b 监听器的回调?法operationComplete(),将会在对应的操作完成时被调?。然后监听器可以判断该操作是成功地完成了还是出错了。
c 每个 Netty 的出站 I/O 操作都将返回?个 ChannelFuture,也就是说,它们都不会阻塞。所以说,Netty完全是异步和事件驱动的。


2.7 组件小结



上图解释


将组件串起来


三 缓存区-ByteBuf


ByteBuf是我们开发中代码操作最多部分和出现问题最多的一部分。比如常见的TCP协议通信的粘包和拆包解决,和ByteBuf密切相关。后面文章会详细分析,先不展开。我们这里先了解ByteBuf的常用API和执行内幕。


3.1 ByteBuf概述


1. 初识ByteBuf


JavaNIO提供了缓存容器(ByteBuffer),但是使用复杂。因此netty引入缓存ButeBuf,
一串字节数组构成。


2. ByteBuf两个索引(readerIndex,writerIndex)


a readerIndex 将会根据读取的字节数递增
b writerIndex 也会根据写?的字节数进?递增
注意:如果readerIndex超过了writerIndex的时候,Netty会抛出IndexOutOf-BoundsException异常。



3.2 ByteBuf基本使用


1. 读取


package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf01 {
    public static void main(String[] args) {
        //构造
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",
        CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){ //?法?:内部通过移动readerIndex进?读取
         System.out.println((char)byteBuf.readByte());
        }
        //?法?:通过下标直接读取
        for (int i = 0; i < byteBuf.readableBytes(); i++) {
         System.out.println((char)byteBuf.getByte(i));
        }
        //?法三:转化为byte[]进?读取
        byte[] bytes = byteBuf.array();
        for (byte b : bytes) {
        System.out.println((char)b);
        }
    }
}


2. 写入


package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf02 {
    public static void main(String[] args) {
        //构造空的字节缓冲区,初始??为10,最?为20
        ByteBuf byteBuf = Unpooled.buffer(10,20);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        for (int i = 0; i < 5; i++) {
         byteBuf.writeInt(i); //写?int类型,?个int占4个字节
        }
        System.out.println("ok");
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){
         System.out.println(byteBuf.readInt());
        }
    }
}


3. 丢弃已读字节




package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf03 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        while (byteBuf.isReadable()){
         System.out.println((char)byteBuf.readByte());
        }
        byteBuf.discardReadBytes(); //丢弃已读的字节空间
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}



4. clear()



package com.haopt.netty.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf04 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world",CharsetUtil.UTF_8);
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        byteBuf.clear(); //重置readerIndex 、 writerIndex 为0
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}



3.3 ByteBuf 使?模式


3.3.1 根据存放缓冲区,分为三类


1. 堆缓存区(HeapByteBuf)

内存的分配和回收速度?较快,可以被JVM?动回收,缺点是,如果进?socket的IO读写,需要额外做?次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有?定程度的下降。由于在堆上被 JVM 管理,在不被使?时可以快速释放。可以通过 ByteBuf.array() 来获取 byte[] 数据。


2. 直接缓存区(DirectByteBuf)

?堆内存,它在对外进?内存分配,相?堆内存,它的分配和回收速度会慢?些,但是将它写?或从Socket Channel中读取时,由于减少了?次内存拷?,速度?堆内存块。



3. 复合缓存区

顾名思义就是将上述两类缓冲区聚合在?起。Netty 提供了?个 CompsiteByteBuf,可以将堆缓冲区和直接缓冲区的数据放在?起,让使?更加?便。



3.3.2 缓存区选择


Netty默认使?的是直接缓冲区(DirectByteBuf),如果需要使?堆缓冲区(HeapByteBuf)模式,则需要进?系统参数的设置。


//netty中IO操作都是基于Unsafe完成的
System.setProperty("io.netty.noUnsafe", "true"); 
//ByteBuf的分配要设置为?池化,否则不能切换到堆缓冲器模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);


3.3.3 ByteBuf对象是否池化(Netty是默认池化的)


1. 池化化和非池化的实现


PooledByteBufAllocator,实现了ByteBuf的对象的池化,提?性能减少并最?限度地减少内存碎?。
UnpooledByteBufAllocator,没有实现对象的池化,每次会?成新的对象实例。


2. 代码实现(让Netty中ByteBuf对象不池化)


//通过ChannelHandlerContext获取ByteBufAllocator实例
ctx.alloc();
//通过channel也可以获取
channel.alloc();

//Netty默认使?了PooledByteBufAllocator
//可以在引导类中设置?池化模式
serverBootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT);
//或通过系统参数设置
System.setProperty("io.netty.allocator.type", "pooled");
System.setProperty("io.netty.allocator.type", "unpooled");


我在开发项目中,我一般不进行更改。因为我觉得池化效率更高。有其他高见,欢迎留言。


3.5 ByteBuf的释放


ByteBuf如果采?的是堆缓冲区模式的话,可以由GC回收,但是如果采?的是直接缓冲区,就不受GC的 管理,就得?动释放,否则会发?内存泄露。


3.5.1 ByteBuf的手动释放(一般不推荐使用,了解)


1. 实现逻辑


?动释放,就是在使?完成后,调?ReferenceCountUtil.release(byteBuf); 进?释放。通过release?法减去 byteBuf的使?计数,Netty 会?动回收 byteBuf。


2. 代码


/**
* 获取客户端发来的数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
    System.out.println("客户端发来数据:" + msgStr);
    //释放资源
    ReferenceCountUtil.release(byteBuf);
}


注意:


?动释放可以达到?的,但是这种?式会?较繁琐,如果?旦忘记释放就可能会造成内存泄露。


3.5.1 ByteBuf的自动释放


?动释放有三种?式,分别是:?站的TailHandler、继承SimpleChannelInboundHandler、 HeadHandler的出站释放。


1. TailHandler


Netty的ChannelPipleline的流?线的末端是TailHandler,默认情况下如果每个?站处理器Handler都把消息往下传,TailHandler会释放掉ReferenceCounted类型的消息。


/**
* 获取客户端发来的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
    System.out.println("客户端发来数据:" + msgStr);
    //向客户端发送数据
    ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    ctx.fireChannelRead(msg); //将ByteBuf向下传递
}


源码:


在DefaultChannelPipeline中的TailContext内部类会在最后执?


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
 onUnhandledInboundMessage(ctx, msg);
}
//最后会执?
protected void onUnhandledInboundMessage(Object msg) {
  try {
      logger.debug(
      "Discarded inbound message {} that reached at the tail of the
      pipeline. " + "Please check your pipeline configuration.", msg);
  } finally {
    ReferenceCountUtil.release(msg); //释放资源
  }
}


2. SimpleChannelInboundHandler


当ChannelHandler继承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的channelRead()?法中,将会进?资源的释放。


SimpleChannelInboundHandler的源码


//SimpleChannelInboundHandler中的channelRead()
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
      if (acceptInboundMessage(msg)) {
        @SuppressWarnings("unchecked")
        I imsg = (I) msg;
        channelRead0(ctx, imsg);
      } else {
        release = false;
        ctx.fireChannelRead(msg);
      }
    } finally {
      if (autoRelease && release) {
       ReferenceCountUtil.release(msg); //在这?释放
      }
    }
}


我们handler代码编写:


package com.haopt.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      System.out.println("接收到服务端的消息:" +
      msg.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
      // 向服务端发送数据
      String msg = "hello";
      ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      cause.printStackTrace();
      ctx.close();
    }
}


3. 堆缓冲区(HeadHandler)


出站处理流程中,申请分配到的ByteBuf,通过HeadHandler完成?动释放。


在出站流程开始的时候,通过调?ctx.writeAndFlush(msg),Bytebuf缓冲区开始进?出站处理的pipeline流?线。


在每?个出站Handler中的处理完成后,最后消息会来到出站的最后?棒HeadHandler,再经过?轮复杂的调?,在flush完成后终将被release掉。


package com.haopt.myrpc.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        				System.out.println("接收到服务端的消息:" +
        				msg.toString(CharsetUtil.UTF_8));
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
       					 // 向服务端发送数据
       					 String msg = "hello";
       					 ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       				 cause.printStackTrace();
       				 ctx.close();
        }
}



3.6 ByteBuf小结


a ?站流程中,如果对原消息不做处理,调ctx.fireChannelRead(msg) 把原消息往下传,由流?线最后?棒 TailHandler 完成?动释放。


b 如果截断了?站处理流?线,则继承SimpleChannelInboundHandler ,完成?站ByteBuf?动释放。


c 出站处理过程中,申请分配到的 ByteBuf,通过 HeadHandler 完成?动释放。


d ?站处理中,如果将原消息转化为新的消息ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉。


e ?站处理中,如果已经不再调? ctx.fireChannelRead(msg) 传递任何消息,也没有继承SimpleChannelInboundHandler 完成?动释放,那更要把原消息release掉。


下文预告


下篇文章更新Netty中的编码器和解码器相关内容。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表