抛出问题 场景一
以前我们觉得web服务器可以让我们忽略如何编写HTTP/RPC服务器,但是处理的负载量和功能变化的速度在不断增加.超过了传统的三层体系结构的承受能力。
那咋办呢?拆分,集群,分布式加机器,加机器性能
那就有2个问题:
运行成本:如果我们将单节点的性能提高30%/100%。那可以节省多少机器?
延迟:当一次web浏览器查询请求出发了几十个跨不同机器的内部远程过程调用时,如何达到最低的延迟?
场景二
假设你去一家大型公司,开发一个新的关键任务应用项目。
然后第一次开发,讨论在不损失性能的情况下扩展到150000 个并发用户。
大家都看向了你:laughing:,你要怎么说?
没问题 :fist_left: respect
似乎也许大概可行
无法实现
大部分人可能会谨慎的回答第二种,然后会后霹雳巴拉google敲下:java 高性能 网络编程
如何学习 学习路径 找到关键字了,怎么学习?
如果你很强(扎实的网络编程经验,涉及网络/多线程/并发),你可能会选择->查看网站netty.io ->下载源码netty Github源码 ->阅读javadoc/blog->动手写代码
如果你刚学,那么可能需要一定的由浅入深,慢慢探索的过程。
首先: 不要怕,netty说到底也只是一个框架,可以理解为就是在NIO上面做了封装,性能更好,使用更方便,仅此而已。
其次:想要更深层次的理解Netty源码,可以系统学习一下Java 网络编程
、NIO
、并发
和异步编程
以及相关的设计模式
学习目标 学会Netty,构建所有可能的网络应用程序,从轻量级的HTTP服务器到高度定制化的RPC服务器
引言 现在互联网架构(分布式),一个优秀的网络通信框架是至关重要。如:Grizzly,XIO,MINA,Netty .
什么叫优秀?
性能、效率
能否屏蔽底层复杂度
编程模型是否简单易懂
是否适用更多的应用场景
社区是否活跃
=> Netty
Netty初步认知 Java网络编程 早期的网络编程开发需要学习复杂的C语言套接字库(socket),虽然java面向对象(object-oriented)封装了很多细节,但是还是要写很多代码,以及做很多研究才能流畅的运行起来一个client/server.而且早期的API(java.net)只支持持阻塞IO(blocking)(BIO)。
阻塞IO
上面这段代码存在2个问题:
会有大量的线程处理休眠状态,等待用户输入/输出,造成资源浪费
只能同时处理一个连接,要处理多个并发客户端.需要每个客户端Socket创建一个新的Thread.而创建Thread的调用栈占用内存(默认64Kb~1M)
即使JVM物理上可以支持很大的数量的线程,线程的上下文切换 (context-switching)的开销就会带来麻烦.比如10000个连接。
针对上面的问题,Java很早就提出了解决方案,即非阻塞IO(Non-blocking I/O)(NIO)
非阻塞IO(NIO)
java.nio.channels.Selector
选择器(多路复用器)使用时间通知API确定哪个Socket做IO读写,一个单一的线程便可以处理多个并发的连接(multiple concurrent connections)。
这样就解决了上面的问题
用较少的线程处理较多的连接。线程少了,内存管理,上下文切换的开销也小了。
如果没有IO读写,线程可以做其他的。
尽管Java NIO
是一种解决方案,但是要做到很正确,很安全不是一件容易的事情。
在高负载下和靠和高效地处理和调度I/O操作是一项繁琐、容易出错的事情,最好让高性能的网络编程框架来做=>Netty
为什么用netty?
直接用底层的JavaI/O api太麻烦,而且维护成本高,不符合面向对象的理念。
netty特性
不用JDK原生API而用netty的理由
JDK中NIO的一些API功能薄弱且复杂,Netty隔离了JDK中NIO的实现变化及实现细节譬如:ByteBuffer -> ByteBuf
主要负责从底层的IO中读取数据到ByteBuf,然后传递给应用程序,应用程序处理完之后封装为ByteBuf,写回给IO
使用JDK原生API需要对多线程要很熟悉 , 因为NIO涉及到Reactor设计模式,得对里面的原理要相当的熟悉
JDK原生方式要实现高可用,需要自己实现断路重连、半包读写、粘包处理、失败缓存处理等相关操作,而Netty则做的更多,它解决了传输的一些问题譬如粘包半包现象,它支持常用的应用层协议,完善的断路重连,idle等异常处理
JDK的NIO存在bug,如经典的epoll bug,会导致CPU 100%而Netty封装的更完善。
网络通信框架为什么非得是Netty
Apache Mina:和Netty是同一作者,但是推荐Netty,作者认为Netty是针对Mina的重新打造版本,解决了一些问题并提高了扩展性
Sun Grizzly:用得少、文档少,更新少。
Apple Swift NIO、ACE 等:其他语言不作考虑
Cindy 等:生命周期不长
Tomcat、Jetty:还没有独立出来,另外他们有自己的网络通信层实现,是为了专门针对servelet容器而做的,不具备通用性。
那tomcat在网络通信层为什么不选择Netty呢?主要是由于tomcat出现的比较早
谁在用Netty 社区活跃,大厂很多在用,apple,twitter,Facebook,Google,开源项目也很多底层用的netty。
数据库: Cassandra
大数据处理: Spark、Hadoop
Message Queue:RocketMQ
检索: Elasticsearch
框架:gRPC、Apache Dubbo、Spring5(响应式编程WebFlux)
分布式协调器:ZooKeeper
工具类: async-http-client
……..
使用到网络的一般都会用到netty
netty是什么 netty是一个异步(asynchronous)事件驱动(event-driven)的高性能网络应用框架
netty中的模型 netty中可以选择不同的IO模型和线程模型。
IO模型 Netty对三种IO的支持 :
线程模型 Reactor 线程模型 - 并发编程模型
Reactor线程模型不是Java专属,也不是Netty专属,它其实是一种并发编程模型,是一种思想,具有指导意义
Reactor模型中定义了三种角色:
Reactor :负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。
Acceptor :处理客户端新连接,并分派请求到处理器链中。
Handler :将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel
单Reactor-单线程 NIO下Reactor单线程:所有的接收连接,处理数据的相关操作都在一个线程中来完成,性能上有瓶颈
1 2 3 4 比如read 读到数据后,要解码,做业务操作。比较耗时的。向网络中发送数据,要编码。 这时候,处理handler1花费了5s,而这期间socketchannel来了很多。就不能及时响应了。 不能让reactor一个人做这么多事情。 把耗时的操作给异步线程。这样reactor轮询的操作就会很快。
单Reactor-多线程 NIO下Reactor多线程:把比较耗时的数据的编解码运算操作放入线程池中来执行,提升了性能但还不是最好的方式
1 2 3 比如说有1w个channel注册到单reactor。而检测到5000个上有读事件。然后为这5000个channel做IO读事件。假如耗费10s.而这10s,其他的连接就有延迟。这就是吞吐量上不去。 建立连接这件事很重要,要和socketchannel拆开。开个单独的reactor处理serversocket连接。 然后处理socketchannel开多个reactor线程处理大量的channel。
主从Reactor-多线程 主从多线程,对于服务器来说,接收客户端的连接是比较重要的,因此将这部分操作单独用线程去操作
工作流程:
1 2 3 4 5 6 7 8 这种模式的基本工作流程为: 1)Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过 Acceptor 处理客户端连接事件。 2)当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。) 3)SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理。 4)当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理。 5)Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理。 6)Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据。 7)一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个 SubReactor 线程
优势:
1 2 3 4 这种模式的优势如下: 1)MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接, SubReactor 线程完成后续的业务处理。 2)MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据。 3)多个 SubReactor 线程能够应对更高的并发请求。 这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括 Nginx、 Memcached、Netty 等。 这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
Netty中的Reactor实现 Netty对Reactor的支持 :
Netty线程模型是基于Reactor模型实现的,对Reactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型。
Netty中的Reactor实现 :
工作流程 :
1 2 3 4 5 6 7 8 9 10 11 1)Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,每个线程池中都有EventLoop 线程(可以是OIO,NIO,AIO)。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写, EventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 2)EventLoop 表示一个不断循环的执行事件处理的线程,每个EventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络 连接(Channel)。 3)每个 Boss EventLoop 中循环执行以下三个步骤: 3.1)select:轮循注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件) 3.2)processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个SocketChannel,并将其注册到某个 Worker EventLoop 上的 Selector 上 3.3)runAllTasks:再去以此循环处理任务队列中的其他任务 4)每个 Worker EventLoop 中循环执行以下三个步骤: 4.1)select:轮训注册在其上的SocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件) 4.2)processSelectedKeys:在对应的SocketChannel 上处理 read/write 事件 4.3)runAllTasks:再去以此循环处理任务队列中的其他任务 5)在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对 应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)
Netty如何使用Reactor模式
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Slf4j public class NettyServer { public static void main (String[] args) { NettyServer server = new NettyServer (); server.start(8888 ); } private void start (int port) { EventLoopGroup boss = new NioEventLoopGroup (1 ,new DefaultThreadFactory ("boss" )); EventLoopGroup worker = new NioEventLoopGroup (0 ,new DefaultThreadFactory ("worker" )); ServerInboundHandler2 handler2 = new ServerInboundHandler2 (); ExecutorService service = Executors.newFixedThreadPool(NettyRuntime.availableProcessors()*2 ); EventExecutorGroup business = new UnorderedThreadPoolEventExecutor (NettyRuntime.availableProcessors()*2 ,new DefaultThreadFactory ("business" )); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(boss,worker) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler (LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG,1024 ) .childOption(ChannelOption.SO_KEEPALIVE,true ) .childOption(ChannelOption.TCP_NODELAY,true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ServerReadIdleHandler ()); pipeline.addLast(new LengthFieldPrepender (4 )); pipeline.addLast("stringencoder" ,new StringEncoder ()); pipeline.addLast(new LengthFieldBasedFrameDecoder (65536 ,0 ,4 ,0 ,4 )); pipeline.addLast(new StringDecoder ()); pipeline.addLast(business,new TcpStickHalfHandler1 (service)); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf4j public class NettyClient { public static void main (String[] args) { NettyClient client = new NettyClient (); client.connect("127.0.0.1" ,8888 ); } private void connect (String host, int port) { EventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ClientWriterIdleHandler ()); pipeline.addLast(new LengthFieldPrepender (4 )); pipeline.addLast(new StringEncoder ()); pipeline.addLast(new LengthFieldBasedFrameDecoder (65536 ,0 ,4 ,0 ,4 )); pipeline.addLast(new StringDecoder ()); pipeline.addLast(new ClientInboundHandler1 ()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
小结
1 2 3 4 5 6 1)Netty 的线程模型基于主从多Reactor模型。通常由一个线程负责处理OP_ACCEPT事件,拥有 CPU 核数的两倍的IO线程处理读写事件 2)一个通道的IO操作会绑定在一个IO线程中,而一个IO线程可以注册多个通道 3)在一个网络通信中通常会包含网络数据读写,编码、解码、业务处理。默认情况下网络数据读写,编码、解码等操作会在IO线程中 运行,但也可以指定其他线程池。 4)通常业务处理会单独开启业务线程池(看业务类型),但也可以进一步细化,例如心跳包可以直接在IO线程中处理,而需要再转发给业务线程池,避免线程切换 5)在一个IO线程中所有通道的事件是串行处理的。 6) 通常业务操作会专门开辟一个线程池,那业务处理完成之后,如何将响应结果通过 IO 线程写入到网卡中呢?业务线程调用 Channel 对象的 write 方法并不会立即写入网络,只是将数据放入一个待写入缓存区,然后IO线程每次执行事件选择后,会从待写入缓存区中获取写入任务,将数据真正写入到网络中
核心组件
学习这些组件,看他们是怎么提供事件通知,让网络上的消息得以被处理。
Channels Channel是JavaNIO的基础构成,表示到实体的开发连接(硬件设备/文件/socket/执行IO操作的程序…),比如读写。
可以把Channel看成是传入(inbound)/传出(outbound)的载体
Callbacks netty内部使用了回调(Callback)来处理事件,比如我们可以重写方法,自定义回调:当新连接创建时,打印出一条信息.
Futures 在操作完成时通知应用程序,相当于异步操作结果的占位符。在未来的某个时刻完成,并提供访问结果。
Java内置的接口 java.util.concurrent.Future
的实现只允许手动检查操作是否完成或者阻塞到它完成。不好用。
所以netty封装了ChannelFuture
我们可以往里注册监听器,监听的回调方法会判断操作时成功/失败,而不用手动检查操作是否完成。
上面的案例不会再阻塞等待响应了。等连接到远程节点后会监听检查状态并返回。这样就不用纠结什么时候处理的问题了。线程可以继续做其他事情。有效利用过了资源。
对于错误的处理时开发自己决定的。比如你可以再else中继续尝试建立到另一个远程节点的连接。
Events 和 handlers Netty用不同事件通知我们操作状态改变。这样就可以做响应的处理。比如:记录日志、转换数据、流控制、逻辑处理等。
inbound(入站)事件:连接已被激活、数据读取、用户事件、错误事件
outbound(出站)事件: 打开/关闭到远程节点的连接、将输入写入socket
上图可以理解为每个处理器(Channel Handler)都是对应事件(Event)的回调,netty封装了很多可以直接用的handler。
netty再内部会给每个Channel分配要给eventloop,可以处理所有事件。这样开发只需要关注提供正确的逻辑。不需要顾虑是否同步。
执行顺序:
回写数据时会经过哪些outboundHandler?
回写数据事件流转规则 1 2 3 如果是通过Channel对象进行数据回写,事件会从pipeline尾部流向头部 如果是通过ChannelHandlerContext对象进行数据回写,事件会从当前handler流向头部 问题:OutboundHandler和InboundHandler的先后顺序是否有要求?才能保证所有outboundHandler能被执行
如何让outboundHandler 一定能执行到?
如果想让所有的OutboundHandler都能被执行到,可以选择把OutboundHandler放在最后一个有效的InboundHandler之前
有一种做法是通过addFirst加载所有OutboundHandler,再通过addLast加载所有InboundHandler;另外也推荐:使用
addLast先加载所有OutboundHandler,然后加载所有InboundHandler(注意考虑加载顺序和执行顺序)
出站事件传播和outboundHandler中的数据修改
简单入门案例
客户端将消息发送给服务器,而服务器再将消息回送给客户端。
基础环境:java+maven+idea 搞起
这样一个简单的客户端和服务端互相通信就完成了,当然也可以达成jar包执行。
Netty入门学习 概述 Netty 是什么? 1 2 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
Netty 的作者
他还是另一个著名网络应用框架 Mina 的重要贡献者
Netty 的地位 Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
Cassandra - nosql 数据库
Spark - 大数据分布式计算框架
Hadoop - 大数据分布式存储框架
RocketMQ - ali 开源的消息队列
ElasticSearch - 搜索引擎
gRPC - rpc 框架
Dubbo - rpc 框架
Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
Zookeeper - 分布式协调框架
Netty 的优势
Netty vs NIO,工作量大,bug 多
需要自己构建协议
解决 TCP 传输问题,如粘包、半包
epoll 空轮询导致 CPU 100%
对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
Netty vs 其它网络应用框架
Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
久经考验,16年,Netty 版本
2.x 2004
3.x 2008
4.x 2013
5.x 已废弃(没有明显的性能提升,维护成本高)
Netty 核心架构
核心:
可扩展的事件模型
统一的通信api,简化了通信编码
零拷贝机制与丰富的字节缓冲区
传输服务:
支持socket以及datagram(数据报)
http传输服务
In-VM Pipe (管道协议,是jvm的一种进程)
协议支持:
http 以及 websocket
SSL 安全套接字协议支持
Google Protobuf (序列化框架)
支持zlib、gzip压缩
支持大文件的传输
RTSP(实时流传输协议,是TCP/IP协议体系中的一个应 用层协议)
支持二进制协议并且提供了完整的单元测试
Hello World 目标 开发一个简单的服务器端和客户端
客户端向服务器端发送 hello, world
服务器仅接收,不返回
加入依赖
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.39.Final</version > </dependency >
服务器端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { protected void initChannel (NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new SimpleChannelInboundHandler <String>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8080 );
代码解读
3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
4 处,ServerSocketChannel 绑定的监听端口
5 处,SocketChannel 的处理器,解码 ByteBuf => String
6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ) .sync() .channel() .writeAndFlush(new Date () + ": hello world!" );
代码解读
3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
4 处,指定要连接的服务器和端口
5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
7 处,写入消息并清空缓冲区
8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf
发出
数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
流程梳理
提示💡
一开始需要树立正确的观念
把 channel 理解为数据的通道
把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
把 handler 理解为数据的处理工序
工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
handler 分 Inbound 和 Outbound 两类
把 eventLoop 理解为处理数据的工人
工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
组件 Bootstrap 作用和类型
Bootstrap是引导的意思,它的作用是配置整个Netty程序,将各个组件都串起来,最后绑定端口、启动 Netty服务
Netty中提供了2种类型的引导类,一种用于客户端(Bootstrap),而另一种(ServerBootstrap)用于服务器 ,区别在于:
1、ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接 到远程节点的客户端应用程序所使用的
2、引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap则需要两个
EventLoop 作用和类型
Netty是基于事件驱动的,比如:连接注册,连接激活;数据读取;异常事件等等,有了事件,就需要一 个组件去监控事件的产生和事件的协调处理,这个组件就是EventLoop(事件循环/EventExecutor),在Netty 中每个Channel 都会被分配到一个 EventLoop。一个 EventLoop 可以服务于多个 Channel。每个EventLoop 会占用一个 Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件。
EventLoopGroup 是用来生成 EventLoop 的,包含了一组EventLoop(可以初步理解成Netty线程池)
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
另一条线是继承自 netty 自己的 OrderedEventExecutor,
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
继承自 netty 自己的 EventExecutorGroup
实现了 Iterable 接口提供遍历 EventLoop 的能力
另有 next 方法获取集合中下一个 EventLoop
以一个简单的实现为例:
1 2 3 4 5 DefaultEventLoopGroup group = new DefaultEventLoopGroup (2 );System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());
输出
1 2 3 io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6 io.netty.channel.DefaultEventLoop@60f82f98
也可以使用 for 循环
1 2 3 4 DefaultEventLoopGroup group = new DefaultEventLoopGroup (2 );for (EventExecutor eventLoop : group) { System.out.println(eventLoop); }
输出
1 2 io.netty.channel.DefaultEventLoop@60f82f98 io.netty.channel.DefaultEventLoop@35f983a6
优雅关闭💡 优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
演示 NioEventLoop 处理 io 事件 服务器端两个 nio worker 工人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 new ServerBootstrap () .group(new NioEventLoopGroup (1 ), new NioEventLoopGroup (2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null ; if (byteBuf != null ) { byte [] buf = new byte [16 ]; ByteBuf len = byteBuf.readBytes(buf, 0 , byteBuf.readableBytes()); log.debug(new String (buf)); } } }); } }).bind(8080 ).sync();
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) throws InterruptedException { Channel channel = new Bootstrap () .group(new NioEventLoopGroup (1 )) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { System.out.println("init..." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); } }) .channel(NioSocketChannel.class).connect("localhost" , 8080 ) .sync() .channel(); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu" .getBytes())); Thread.sleep(2000 ); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu" .getBytes()));
最后输出
1 2 3 4 5 6 22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu 22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
再增加两个非 nio 工人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup (2 );new ServerBootstrap () .group(new NioEventLoopGroup (1 ), new NioEventLoopGroup (2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(normalWorkers,"myhandler" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null ; if (byteBuf != null ) { byte [] buf = new byte [16 ]; ByteBuf len = byteBuf.readBytes(buf, 0 , byteBuf.readableBytes()); log.debug(new String (buf)); } } }); } }).bind(8080 ).sync();
客户端代码不变,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] REGISTERED 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] ACTIVE 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 22:19:48 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 22:19:50 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] REGISTERED 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] ACTIVE 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 22:20:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 22:20:27 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] REGISTERED 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] ACTIVE 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 22:20:38 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 22:20:40 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu
可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)
handler 执行中如何换人?💡 关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable () { @Override public void run () { next.invokeChannelRead(m); } }); } }
如果两个 handler 绑定的是同一个线程,那么就直接调用
否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
演示 NioEventLoop 处理普通任务 NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务
1 2 3 4 5 6 7 NioEventLoopGroup nioWorkers = new NioEventLoopGroup (2 );log.debug("server start..." ); Thread.sleep(2000 ); nioWorkers.execute(()->{ log.debug("normal task..." ); });
输出
1 2 22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start... 22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...
可以用来执行耗时较长的任务
演示 NioEventLoop 处理定时任务 1 2 3 4 5 6 7 NioEventLoopGroup nioWorkers = new NioEventLoopGroup (2 );log.debug("server start..." ); Thread.sleep(2000 ); nioWorkers.scheduleAtFixedRate(() -> { log.debug("running..." ); }, 0 , 1 , TimeUnit.SECONDS);
输出
1 2 3 4 5 6 22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start... 22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... ...
可以用来执行定时任务
eventLoopThreads 是多少?
Channel 概念和作用
Netty中的Channel是与网络套接字相关的,可以理解为是socket连接,在客户端与服务端连接的时候就会 建立一个Channel,它负责基本的IO操作,比如:bind()、connect(),read(),write() 等
主要作用:
通过Channel可获得当前网络连接的通道状态。
通过Channel可获得网络连接的配置参数(缓冲区大小等)。
Channel提供异步的网络I/O操作,比如连接的建立、数据的读写、端口的绑定等。
不同协议、不同的I/O类型的连接都有不同的 Channel 类型与之对应
channel 的主要作用
close() 可以用来关闭 channel
closeFuture() 用来处理 channel 的关闭
sync 方法作用是同步等待 channel 关闭
而 addListener 方法是异步等待 channel 关闭
pipeline() 方法添加处理器
write() 方法将数据写入
writeAndFlush() 方法将数据写入并刷出
ChannelFuture 这时刚才的客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ) .sync() .channel() .writeAndFlush(new Date () + ": hello world!" );
现在把它拆开来看
1 2 3 4 5 6 7 8 9 10 11 12 ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ); channelFuture.sync().channel().writeAndFlush(new Date () + ": hello world!" );
1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
实验如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ); System.out.println(channelFuture.channel()); channelFuture.sync(); System.out.println(channelFuture.channel());
执行到 1 时,连接未建立,打印 [id: 0x2e1884dd]
执行到 2 时,sync 方法是同步等待连接建立完成
执行到 3 时,连接肯定建立了,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ); System.out.println(channelFuture.channel()); channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); });
执行到 1 时,连接未建立,打印 [id: 0x749124ba]
ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印 [id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]
CloseFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class CloseFutureClient { public static void main (String[] args) throws InterruptedException { NioEventLoopGroup group new NioEventLoopGroup (); ChannelFuture channelFuture = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); Channel channel = channelFuture.sync().channel(); log.debug("{}" , channel); new Thread (()->{ Scanner scanner = new Scanner (System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); break ; } channel.writeAndFlush(line); } }, "input" ).start(); ChannelFuture closeFuture = channel.closeFuture(); closeFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { log.debug("处理关闭之后的操作" ); group.shutdownGracefully(); } }); } }
异步提升的是什么💡
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍
要点
单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
异步并没有缩短响应时间,反而有所增加(提升的是吞吐量,单位时间内能够处理的数量)
合理进行任务拆分,也是利用异步的关键
Future 和 Promise
future和promise,目的是将值(future)与其计算方式(promise) 分离,从而允许更灵活地进行计算,特别是通过并行化。Future 表示 目标计算的返回值,Promise 表示计算的方式,这个模型将返回结果 和计算逻辑分离,目的是为了让计算逻辑不影响返回结果,从而抽象 出一套异步编程模型。而计算逻辑与结果关联的纽带就是 callback。
Netty中有非常多的异步调用,譬如:client/server的启动,连接,数 据的读写等操作都是支持异步的
Promise机制 Netty的Future,只是增加了监听器。整个异步的状态,是不能进行设置和修改的,于是Netty的 Promise接口扩展了
Netty的Future接口,可以设置异步执行的结果。在IO操作过程,如果顺利完成、或者发生异常,都可以设置Promise的 结果,并且通知Promise的Listener们。
netty内部用了很多channel future监听。我们自己一般不用
ChannelPromise ChannelPromise接口,则继承扩展了Promise和ChannelFuture。所以,ChannelPromise既绑定了Channel,又具备
了设置监听回调的功能,还可以设置IO操作的结果,是Netty实际编程使用的最多的接口
异步结束时机不可控。所以netty中提供了promise机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Test public void testPromise () throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup (); Promise promise = new DefaultPromise (group.next()); group.submit(()->{ log.info("---异步线程执行任务开始----,time={}" , LocalDateTime.now().toString()); try { int i = 1 /0 ; TimeUnit.SECONDS.sleep(3 ); promise.setSuccess("hello netty promise" ); TimeUnit.SECONDS.sleep(3 ); log.info("---异步线程执行任务结束----,time={}" , LocalDateTime.now().toString()); return ; } catch (Exception e) { promise.setFailure(e); } }); promise.addListener(future -> { log.info("----异步任务执行结果:{}" ,future.isSuccess()); }); promise.addListener(future2 -> { log.info("----异步任务执行结果:{}" ,future2.isSuccess()); }); log.info("---主线程----" ); TimeUnit.SECONDS.sleep(10 ); }
通过promise控制回调监听执行时机。
在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称
jdk Future
netty Future
Promise
cancel
取消任务
-
-
isCanceled
任务是否取消
-
-
isDone
任务是否完成,不能区分成功失败
-
-
get
获取任务结果,阻塞等待
-
-
getNow
-
获取任务结果,非阻塞,还未产生结果时返回 null
-
await
-
等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
-
sync
-
等待任务结束,如果任务失败,抛出异常
-
isSuccess
-
判断任务是否成功
-
cause
-
获取失败信息,非阻塞,如果没有失败,返回null
-
addLinstener
-
添加回调,异步接收结果
-
setSuccess
-
-
设置成功结果
setFailure
-
-
设置失败结果
例0 jdk中的future
例0 netty中的future-同步处理
例0 netty中的future-异步处理
例1 同步处理任务成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 DefaultEventLoop eventExecutors = new DefaultEventLoop ();DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success, {}" ,10 ); promise.setSuccess(10 ); }); log.debug("start..." ); log.debug("{}" ,promise.getNow()); log.debug("{}" ,promise.get());
输出
1 2 3 4 11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null 11:51:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10 11:51:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - 10
例2 异步处理任务成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 DefaultEventLoop eventExecutors = new DefaultEventLoop ();DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); promise.addListener(future -> { log.debug("{}" ,future.getNow()); }); eventExecutors.execute(()->{ try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success, {}" ,10 ); promise.setSuccess(10 ); }); log.debug("start..." );
输出
1 2 3 11:49:30 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10 11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - 10
例3 同步处理任务失败 - sync & get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 DefaultEventLoop eventExecutors = new DefaultEventLoop (); DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); eventExecutors.execute(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } RuntimeException e = new RuntimeException ("error..." ); log.debug("set failure, {}" , e.toString()); promise.setFailure(e); }); log.debug("start..." ); log.debug("{}" , promise.getNow()); promise.get();
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null 12:11:08 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error... Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: error... at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41) at com.itcast.oio.DefaultPromiseTest2.main(DefaultPromiseTest2.java:34) Caused by: java.lang.RuntimeException: error... at com.itcast.oio.DefaultPromiseTest2.lambda$main$0(DefaultPromiseTest2.java:27) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745)
例4 同步处理任务失败 - await
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 DefaultEventLoop eventExecutors = new DefaultEventLoop ();DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); eventExecutors.execute(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } RuntimeException e = new RuntimeException ("error..." ); log.debug("set failure, {}" , e.toString()); promise.setFailure(e); }); log.debug("start..." ); log.debug("{}" , promise.getNow()); promise.await(); log.debug("result {}" , (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
输出
1 2 3 4 12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null 12:18:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error... 12:18:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
例5 异步处理任务失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 DefaultEventLoop eventExecutors = new DefaultEventLoop ();DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); promise.addListener(future -> { log.debug("result {}" , (promise.isSuccess() ? promise.getNow() : promise.cause()).toString()); }); eventExecutors.execute(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } RuntimeException e = new RuntimeException ("error..." ); log.debug("set failure, {}" , e.toString()); promise.setFailure(e); }); log.debug("start..." );
输出
1 2 3 12:04:57 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error... 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
例6 await 死锁检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 DefaultEventLoop eventExecutors = new DefaultEventLoop ();DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); eventExecutors.submit(()->{ System.out.println("1" ); try { promise.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("2" ); }); eventExecutors.submit(()->{ System.out.println("3" ); try { promise.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("4" ); });
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 1 2 3 4 io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745)
Handler 和 Pipeline ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
ChannelHandlerAdapter提供了一些方法的默认实现,可减少用户对于ChannelHandler的编写
ChannelDuplexHandler:混合型,既能处理入站事件又能处理出站事件。
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
处理器ChannelHandler
ChannelPipeline 提供了 ChannelHandler 链的容器。以服务端程序为例,客户端发送过来的数据要接收,读取处理,我们称数据是入站的,需要经过一系列Handler处理后;如果服务器想向客户端写回数据,也需要经过一系列Handler处理,我们称数据是出站的。(双向绑定)
inbound/outbound
inbound入站事件处理顺序(方向)是由链表的头到链表尾,outbound事件的处理顺序是由链表尾到链表头。
inbound入站事件由netty内部触发,最终由netty外部的代码消费。
outbound事件由netty外部的代码触发,最终由netty内部消费。
先搞清楚顺序,服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { protected void initChannel (NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { System.out.println(1 ); ctx.fireChannelRead(msg); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { System.out.println(2 ); ctx.fireChannelRead(msg); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { System.out.println(3 ); ctx.channel().write(msg); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4 ); ctx.write(msg, promise); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5 ); ctx.write(msg, promise); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter (){ @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6 ); ctx.write(msg, promise); } }); } }) .bind(8080 );
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ) .addListener((ChannelFutureListener) future -> { future.channel().writeAndFlush("hello,world" ); });
服务器端打印:
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
入站处理器中,ctx.fireChannelRead(msg)
是 调用下一个入站处理器
如果注释掉 1 处代码,则仅会打印 1
如果注释掉 2 处代码,则仅会打印 1 2
3 处的 ctx.channel().write(msg)
会 从尾部开始触发 后续出站处理器的执行
类似的,出站处理器中,ctx.write(msg, promise)
的调用也会 触发上一个出站处理器
如果注释掉 6 处代码,则仅会打印 1 2 3 6
ctx.channel().write(msg) vs ctx.write(msg)
都是触发出站处理器的执行
ctx.channel().write(msg)
从尾部开始查找出站处理器
ctx.write(msg)
是从当前节点找上一个出站处理器
3 处的 ctx.channel().write(msg)
如果改为 ctx.write(msg)
仅会打印 1 2 3
,因为节点3 之前没有其它出站处理器了
6 处的 ctx.write(msg, promise)
如果改为 ctx.channel().write(msg)
会打印 1 2 3 6 6 6...
因为 ctx.channel().write()
是从尾部开始查找,结果又是节点6 自己
简单的说就是,ctx.channel()
每次都是从后往前找,会出现重复调用的样子。当然一般建议出站事件放前面,就可以规避这个问题。
图1 - 服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
hannelInboundHandlerAdapter 和 SimpleChannelInboundHandler inboundHandler继承谁?
对于编写Netty数据入站处理器,可以选择继承 ChannelInboundHandlerAdapter,也可以选择继承 SimpleChannelInboundHandler<I>
,区别是什么?
ByteBuf 是对字节数据的封装
Java NIO 提供了ByteBuffer 作为它的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。Netty使用ByteBuf来替代ByteBuffer,它是一个强大的实现,既解决了JDK API 的局限性, 又为网络应用程序的开发者提供了更好的API
从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息,ByteBuf提供了两个索引,一个用于读取数据(readerIndex ),一个用于写入数据(writerIndex)。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。而JDK的ByteBuffer只有一个索引,因此需要使用flip方法进行读写切换
1)创建 1 2 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10 ); log(buffer);
上面代码创建了一个默认的 ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10
输出
1 read index:0 write index:0 capacity:10
其中 log 方法参考如下
1 2 3 4 5 6 7 8 9 10 11 private static void log (ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1 ) + 4 ; StringBuilder buf = new StringBuilder (rows * 80 * 2 ) .append("read index:" ).append(buffer.readerIndex()) .append(" write index:" ).append(buffer.writerIndex()) .append(" capacity:" ).append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); }
2)直接内存 vs 堆内存 ByteBuf三类使用模式 三类ByteBuf
堆缓冲区(HeapByteBuf):内存分配在jvm堆,分配和回收速度比较快,可以被JVM自动回收,缺点是,如果进行 socket的IO读写,需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核Channel中,性能会有一定程度的下 降。由于在堆上被 JVM 管理,在不被使用时可以快速释放。可以通过 ByteBuf.array()
来获取 byte[] 数据。
直接缓冲区(DirectByteBuf):内存分配的是堆外内存(系统内存),相比堆内存,它的分配和回收速度会慢一些, 但是将它写入或从Socket Channel中读取时,由于减少了一次内存拷贝,速度比堆内存块。
复合缓冲区(CompositeByteBuf):顾名思义就是将两个不同的缓冲区从逻辑上合并,让使用更加方便。
Netty默认使用的是DirectByteBuf,如果需要使用HeapByteBuf模式,则需要进行系统参数的设置
1 //设置HeapByteBuf模式,但ByteBuf 的分配器ByteBufAllocator要设置为非池化,否则不能 切换到堆缓冲器模式 System.setProperty("io.netty.noUnsafe", "true")
关于堆外内存的理解
可以使用下面的代码来创建池化基于堆的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10 );
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10 );
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
3)池化 vs 非池化 池化的最大意义在于可以重用 ByteBuf,优点有
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
1 2 -Dio.netty.allocator.type={unpooled|pooled} 也可以在代码里设置System.setProperty("io.netty.allocator.type" , "unpooled" );
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
4.1 之前,池化功能还不成熟,默认是非池化实现
池化只能netty自己内部用.不让开发者用.怕用不好
4)组成 ByteBuf 由四部分组成
最开始读写指针都在 0 位置
readerIndex:指示读取的起始位置, 每读取一个字节, readerIndex自增累加1。 如果readerIndex 与 writerIndex 相等,ByteBuf 不可读。
writerIndex:指示写入的起始位置, 每写入一个字节, writeIndex自增累加1。如果增加到 writerIndex 与 capacity() 容量相等,表示 ByteBuf 已经不可写,但是这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity
maxCapacity:指示ByteBuf 可以扩容的最大容量, 如果向ByteBuf写入数据时, 容量不足, 可以进行扩容的最大容量
5)写入 方法列表,省略一些不重要的方法
方法签名
含义
备注
writeBoolean(boolean value)
写入 boolean 值
用一字节 01|00 代表 true|false
writeByte(int value)
写入 byte 值
writeShort(int value)
写入 short 值
writeInt(int value)
写入 int 值
Big Endian,即 0x250,写入后 00 00 02 50 大端,先写高位
writeIntLE(int value)
写入 int 值
Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)
写入 long 值
writeChar(int value)
写入 char 值
writeFloat(float value)
写入 float 值
writeDouble(double value)
写入 double 值
writeBytes(ByteBuf src)
写入 netty 的 ByteBuf
writeBytes(byte[] src)
写入 byte[]
writeBytes(ByteBuffer src)
写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)
写入字符串
注意
这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
网络传输,默认习惯是 Big Endian
先写入 4 个字节
1 2 buffer.writeBytes(new byte []{1 , 2 , 3 , 4 }); log(buffer);
结果是
1 2 3 4 5 6 read index:0 write index:4 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 |.... | +--------+-------------------------------------------------+----------------+
再写入一个 int 整数,也是 4 个字节
1 2 buffer.writeInt(5 ); log(buffer);
结果是
1 2 3 4 5 6 read index:0 write index:8 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 |........ | +--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
6)扩容
再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容
1 2 buffer.writeInt(6 ); log(buffer);
扩容规则是
如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
如果写入后数据大小超过 512,则选择下一个 2^n
,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024
(2^9=512
已经不够了)
扩容不能超过 max capacity 会报错
结果是
1 2 3 4 5 6 read index:0 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ | +--------+-------------------------------------------------+----------------+
7)读取
例如读了 4 次,每次一个字节
1 2 3 4 5 System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); log(buffer);
读过的内容,就属于废弃部分 了,再读只能读那些尚未读取的部分
1 2 3 4 5 6 7 8 9 10 1 2 3 4 read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 06 |........ | +--------+-------------------------------------------------+----------------+
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
1 2 3 buffer.markReaderIndex(); System.out.println(buffer.readInt()); log(buffer);
结果
1 2 3 4 5 6 7 5 read index:8 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 06 |.... | +--------+-------------------------------------------------+----------------+
这时要重复读取的话,重置到标记位置 reset
1 2 buffer.resetReaderIndex(); log(buffer);
这时
1 2 3 4 5 6 read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 06 |........ | +--------+-------------------------------------------------+----------------+
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index
8)retain & release
引用计数:ByteBuf如果采用的是堆缓冲区模式的话,可以由GC回收,但是如果采用的是直接缓冲区,就不受GC的管理,就得手动释放,否则会发生内存泄露,Netty自身引入了引用计数,提供了ReferenceCounted接口,当对象的引用计数>0时要保证对象不被释放,当为0时需要被释放
如何释放?
关于ByteBuf的释放,分为手动释放与自动释放:
手动释放,就是在使用完成后,调用ReferenceCountUtil.release(byteBuf); 进行释放,这种方式的弊端就是一旦忘 记释放就可能会造成内存泄露
自动释放有三种方式,分别是:入站的TailHandler(TailContext)、继承SimpleChannelInboundHandler、 HeadHandler(HeadContext)的出站释放
TailContext:Inbound流水线的末端,如果前面的handler都把消息向后传递最终由TailContext释放该消息,需 要注意的是,如果没有进行向下传递,是不会进行释放操作的
SimpleChannelInboundHandler:自定义的InboundHandler继承自SimpleChannelInboundHandler,在 SimpleChannelInboundHandler中自动释放
HeadContext:outbound流水线的末端,出站消息一般是由应用所申请,到达最后一站时,经过一轮复杂的调 用,在flush完成后终将被release掉
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
不是我们想象的(一般情况下)
1 2 3 4 5 6 ByteBuf buf = ...try { ... } finally { buf.release(); }
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release ,详细分析如下
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
1 2 3 4 5 6 7 8 9 10 protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } }
具体代码
1 2 3 4 5 6 7 public static boolean release (Object msg) { if (msg instanceof ReferenceCounted) { return ((ReferenceCounted) msg).release(); } return false ; }
小结:
对于入站消息:
对原消息不做处理,依次调用 ctx.fireChannelRead(msg)把原消息往下传,如果能到TailContext,那不用做什么释放,它会自动释放
将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那需要将原消息release掉
如果已经不再调用ctx.fireChannelRead(msg)传递任何消息,需要把原消息release掉。对于出站消息:则无需用户关心,消息最终都会走到HeadContext,flush之后会自动释放。
9)slice 【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
例,原始 ByteBuf 进行一些初始操作
1 2 3 4 ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10 );origin.writeBytes(new byte []{1 , 2 , 3 , 4 }); origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
1 2 3 ByteBuf slice = origin.slice();System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
如果原始 ByteBuf 再次读操作(又读了一个字节)
1 2 origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 03 04 |.. | +--------+-------------------------------------------------+----------------+
这时的 slice 不受影响,因为它有独立的读写指针
1 System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
如果 slice 的内容发生了更改
1 2 slice.setByte(2 , 5 ); System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 05 |... | +--------+-------------------------------------------------+----------------+
这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存
1 System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 03 05 |.. | +--------+-------------------------------------------------+----------------+
10)duplicate 【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
11)copy 会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
12)CompositeByteBuf 【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
有两个 ByteBuf 如下
1 2 3 4 5 6 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 );buf1.writeBytes(new byte []{1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 );buf2.writeBytes(new byte []{6 , 7 , 8 , 9 , 10 }); System.out.println(ByteBufUtil.prettyHexDump(buf1)); System.out.println(ByteBufUtil.prettyHexDump(buf2));
输出
1 2 3 4 5 6 7 8 9 10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 |..... | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 06 07 08 09 0a |..... | +--------+-------------------------------------------------+----------------+
现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?
方法1:
1 2 3 4 5 ByteBuf buf3 = ByteBufAllocator.DEFAULT .buffer(buf1.readableBytes()+buf2.readableBytes()); buf3.writeBytes(buf1); buf3.writeBytes(buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3));
结果
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
这种方法好不好?回答是不太好,因为进行了数据的内存复制操作
方法2:
1 2 3 CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();buf3.addComponents(true , buf1, buf2);
结果是一样的
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
缺点,复杂了很多,多次操作会带来性能的损耗
13)Unpooled
对于Pooled类型的ByteBuf,不管是PooledDirectByteBuf还是PooledHeapByteBuf都只能由Netty内部自己使用(构造是私有和受保护的),开发者可以使用Unpooled类型的ByteBuf。
Netty提供Unpooled工具类创建的ByteBuf都是unpooled类型,默认采用的Allocator是direct类型;当然用户可以自己选择创建UnpooledDirectByteBuf和UnpooledHeapByteBuf
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
1 2 3 4 5 6 7 8 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 );buf1.writeBytes(new byte []{1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 );buf2.writeBytes(new byte []{6 , 7 , 8 , 9 , 10 }); ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);System.out.println(ByteBufUtil.prettyHexDump(buf3));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
也可以用来包装普通字节数组,底层也不会有拷贝操作
1 2 3 ByteBuf buf4 = Unpooled.wrappedBuffer(new byte []{1 , 2 , 3 }, new byte []{4 , 5 , 6 });System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));
输出
1 2 3 4 5 6 class io.netty.buffer.CompositeByteBuf +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 |...... | +--------+-------------------------------------------------+----------------+
ByteBuf 优势💡
池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
读写指针分离,不需要像 ByteBuffer 一样切换读写模式
可以自动扩容
支持链式调用,使用更流畅
很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf
ByteBuf 的分配器
双向通信 练习 实现一个 echo server
编写 server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ByteBuf buffer = (ByteBuf) msg; System.out.println(buffer.toString(Charset.defaultCharset())); ByteBuf response = ctx.alloc().buffer(); response.writeBytes(buffer); ctx.writeAndFlush(response); } }); } }).bind(8080 );
编写 client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 NioEventLoopGroup group = new NioEventLoopGroup ();Channel channel = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ByteBuf buffer = (ByteBuf) msg; System.out.println(buffer.toString(Charset.defaultCharset())); } }); } }).connect("127.0.0.1" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); new Thread (() -> { Scanner scanner = new Scanner (System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); break ; } channel.writeAndFlush(line); } }).start();
读和写的误解💡 我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class TestServer { public static void main (String[] args) throws IOException { ServerSocket ss = new ServerSocket (8888 ); Socket s = ss.accept(); new Thread (() -> { try { BufferedReader reader = new BufferedReader (new InputStreamReader (s.getInputStream())); while (true ) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread (() -> { try { BufferedWriter writer = new BufferedWriter (new OutputStreamWriter (s.getOutputStream())); for (int i = 0 ; i < 100 ; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); } } catch (IOException e) { e.printStackTrace(); } }).start(); } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class TestClient { public static void main (String[] args) throws IOException { Socket s = new Socket ("localhost" , 8888 ); new Thread (() -> { try { BufferedReader reader = new BufferedReader (new InputStreamReader (s.getInputStream())); while (true ) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread (() -> { try { BufferedWriter writer = new BufferedWriter (new OutputStreamWriter (s.getOutputStream())); for (int i = 0 ; i < 100 ; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); } } catch (IOException e) { e.printStackTrace(); } }).start(); } }
Netty进阶学习 粘包与半包
粘包现象 服务端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class HelloWorldServer { static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class); void start () { NioEventLoopGroup boss = new NioEventLoopGroup (1 ); NioEventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("connected {}" , ctx.channel()); super .channelActive(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { log.debug("disconnect {}" , ctx.channel()); super .channelInactive(ctx); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080 ); log.debug("{} binding..." , channelFuture.channel()); channelFuture.sync(); log.debug("{} bound..." , channelFuture.channel()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); log.debug("stoped" ); } } public static void main (String[] args) { new HelloWorldServer ().start(); } }
客户端代码希望发送 10 个消息,每个消息是 16 字节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connetted..." ); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); Random r = new Random (); char c = 'a' ; for (int i = 0 ; i < 10 ; i++) { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); } } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1" , 8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
服务器端的某次输出,可以看到一次就接收了 160 个字节,而非分 10 次接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5] binding... 08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5, L:/0:0:0:0:0:0:0:0:8080] bound... 08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] REGISTERED 08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] ACTIVE 08:24:55 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] 08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ: 160B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ COMPLETE
半包现象 客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为
1 2 3 4 5 ByteBuf buffer = ctx.alloc().buffer();for (int i = 0 ; i < 10 ; i++) { buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); } ctx.writeAndFlush(buffer);
为现象明显,服务端修改一下接收缓冲区,其它代码不变
1 serverBootstrap.option(ChannelOption.SO_RCVBUF, 10 );
服务器端的某次输出,可以看到接收的消息被分为两节,第一次 20 字节,第二次 140 字节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84] binding... 08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84, L:/0:0:0:0:0:0:0:0:8080] bound... 08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] REGISTERED 08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] ACTIVE 08:44:23 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] 08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 20B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000010| 00 01 02 03 |.... | +--------+-------------------------------------------------+----------------+ 08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE 08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 140B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000020| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000030| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000040| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000050| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000060| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000070| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000080| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |............ | +--------+-------------------------------------------------+----------------+ 08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
注意
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
现象分析 从两个角度看粘包和拆包:
1、收发角度:一个发送可能被多次接收(半包),多个发送可能被一次接收(粘包)
2、传输角度:一个发送可能占用多个传输包(半包),多个发送可能公用一个传输包(粘包)
根本原因:
TCP 协议是面向连接的、可靠的、基于字节流的传输层通信协议,是一种流式协议,消息无边界
粘包
现象,发送 abc def
,接收 abcdef
原因
应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
Nagle 算法:会造成粘包
半包
现象,发送 abcdef
,接收 abc def
原因
应用层:接收方 ByteBuf 小于实际发送数据量
滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质是因为 TCP 是流式协议,消息无边界
滑动窗口
TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
MSS 限制
链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
以太网的 MTU 是 1500
FDDI(光纤分布式数据接口)的 MTU 是 4352
本地回环地址的 MTU 是 65535 - 本地测试不走网卡
MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
Nagle 算法
即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由
该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
如果 SO_SNDBUF 的数据达到 MSS,则需要发送
如果 SO_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
如果 TCP_NODELAY = true,则需要发送
已发送的数据都收到 ack 时,则需要发送
上述条件不满足,但发生超时(一般为 200ms)则需要发送
除上述情况,延迟发送
解决方案
短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
每一条消息采用固定长度,缺点浪费空间
每一条消息采用分隔符,例如 \n
,缺点需要转义
每一条消息分为 head 和 body,head 中包含 body 的长度
方法1 短链接 以解决粘包为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { send(); } } private static void send () { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("conneted..." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); ctx.close(); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost" , 8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
输出,略
半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的
方法2 固定长度 让所有数据包长度固定(假设长度为 8 字节),服务器端加入
1 ch.pipeline().addLast(new FixedLengthFrameDecoder (8 ));
客户端测试代码,注意, 采用这种方法后,客户端什么时候 flush 都可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connetted..." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); Random r = new Random (); char c = 'a' ; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0 ; i < 10 ; i++) { byte [] bytes = new byte [8 ]; for (int j = 0 ; j < r.nextInt(8 ); j++) { bytes[j] = (byte ) c; } c++; buffer.writeBytes(bytes); } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103" , 9090 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
客户端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - connetted... 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3c2ef3c2] REGISTERED 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3c2ef3c2] CONNECT: /192.168.0.103:9090 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] ACTIVE 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - sending... 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] WRITE: 80B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 00 00 00 00 62 00 00 00 00 00 00 00 |aaaa....b.......| |00000010| 63 63 00 00 00 00 00 00 64 00 00 00 00 00 00 00 |cc......d.......| |00000020| 00 00 00 00 00 00 00 00 66 66 66 66 00 00 00 00 |........ffff....| |00000030| 67 67 67 00 00 00 00 00 68 00 00 00 00 00 00 00 |ggg.....h.......| |00000040| 69 69 69 69 69 00 00 00 6a 6a 6a 6a 00 00 00 00 |iiiii...jjjj....| +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x3c2ef3c2, L:/192.168.0.103:53155 - R:/192.168.0.103:9090] FLUSH
服务端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 12:06:51 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0xe3d9713f] binding... 12:06:51 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0xe3d9713f, L:/192.168.0.103:9090] bound... 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] REGISTERED 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] ACTIVE 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 00 00 00 00 |aaaa.... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 62 00 00 00 00 00 00 00 |b....... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 63 00 00 00 00 00 00 |cc...... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 64 00 00 00 00 00 00 00 |d....... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 00 00 00 00 00 |........ | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 66 66 66 66 00 00 00 00 |ffff.... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 67 67 67 00 00 00 00 00 |ggg..... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 00 00 00 00 00 00 00 |h....... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 69 69 69 69 69 00 00 00 |iiiii... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6a 6a 6a 6a 00 00 00 00 |jjjj.... | +--------+-------------------------------------------------+----------------+ 12:07:00 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xd739f137, L:/192.168.0.103:9090 - R:/192.168.0.103:53155] READ COMPLETE
缺点是,数据包的大小不好把握
长度定的太大,浪费
长度定的太小,对某些数据包又显得不够
方法3 固定分隔符 服务端加入,默认以 \n
或 \r\n
作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
1 ch.pipeline().addLast(new LineBasedFrameDecoder (1024 ));
客户端在每条消息之后,加入 \n
分隔符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connetted..." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); Random r = new Random (); char c = 'a' ; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0 ; i < 10 ; i++) { for (int j = 1 ; j <= r.nextInt(16 )+1 ; j++) { buffer.writeByte((byte ) c); } buffer.writeByte(10 ); c++; } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103" , 9090 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
客户端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - connetted... 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x1282d755] REGISTERED 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x1282d755] CONNECT: /192.168.0.103:9090 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] ACTIVE 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - sending... 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] WRITE: 60B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 0a 62 62 62 0a 63 63 63 0a 64 64 0a 65 65 65 |a.bbb.ccc.dd.eee| |00000010| 65 65 65 65 65 65 65 0a 66 66 0a 67 67 67 67 67 |eeeeeee.ff.ggggg| |00000020| 67 67 0a 68 68 68 68 0a 69 69 69 69 69 69 69 0a |gg.hhhh.iiiiiii.| |00000030| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 0a |jjjjjjjjjjj. | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x1282d755, L:/192.168.0.103:63641 - R:/192.168.0.103:9090] FLUSH
服务端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] c.i.n.HelloWorldServer - connected [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 1B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 |a | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 62 62 62 |bbb | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 3B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 63 63 |ccc | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 64 64 |dd | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeee | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 66 66 |ff | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 67 67 67 67 67 67 67 |ggggggg | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 68 68 68 |hhhh | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 7B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 69 69 69 69 69 69 69 |iiiiiii | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ: 11B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a 6a |jjjjjjjjjjj | +--------+-------------------------------------------------+----------------+ 14:08:18 [DEBUG] [nioEventLoopGroup-3-5] i.n.h.l.LoggingHandler - [id: 0xa4b3be43, L:/192.168.0.103:9090 - R:/192.168.0.103:63641] READ COMPLETE
缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误
方法4 预设长度 在发送消息前,先约定用定长字节表示接下来数据的长度
1 2 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024 , 0 , 1 , 0 , 1 ));
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connetted..." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); Random r = new Random (); char c = 'a' ; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0 ; i < 10 ; i++) { byte length = (byte ) (r.nextInt(16 ) + 1 ); buffer.writeByte(length); for (int j = 1 ; j <= length; j++) { buffer.writeByte((byte ) c); } c++; } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103" , 9090 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
客户端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - connetted... 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xf0f347b8] REGISTERED 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xf0f347b8] CONNECT: /192.168.0.103:9090 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] ACTIVE 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.HelloWorldClient - sending... 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] WRITE: 97B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 09 61 61 61 61 61 61 61 61 61 09 62 62 62 62 62 |.aaaaaaaaa.bbbbb| |00000010| 62 62 62 62 06 63 63 63 63 63 63 08 64 64 64 64 |bbbb.cccccc.dddd| |00000020| 64 64 64 64 0f 65 65 65 65 65 65 65 65 65 65 65 |dddd.eeeeeeeeeee| |00000030| 65 65 65 65 0d 66 66 66 66 66 66 66 66 66 66 66 |eeee.fffffffffff| |00000040| 66 66 02 67 67 02 68 68 0e 69 69 69 69 69 69 69 |ff.gg.hh.iiiiiii| |00000050| 69 69 69 69 69 69 69 09 6a 6a 6a 6a 6a 6a 6a 6a |iiiiiii.jjjjjjjj| |00000060| 6a |j | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xf0f347b8, L:/192.168.0.103:49979 - R:/192.168.0.103:9090] FLUSH
服务端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 14:36:50 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0xdff439d3] binding... 14:36:51 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0xdff439d3, L:/192.168.0.103:9090] bound... 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] REGISTERED 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] ACTIVE 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 61 61 61 61 61 |aaaaaaaaa | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 62 62 62 62 62 62 62 62 62 |bbbbbbbbb | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 63 63 63 63 63 |cccccc | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 64 64 64 64 64 64 64 64 |dddddddd | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 15B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 65 65 65 65 65 65 65 65 65 65 65 65 65 65 65 |eeeeeeeeeeeeeee | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 13B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 66 66 66 66 66 66 66 66 66 66 66 66 66 |fffffffffffff | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 67 67 |gg | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 68 |hh | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 14B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 69 69 69 69 69 69 69 69 69 69 69 69 69 69 |iiiiiiiiiiiiii | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ: 9B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6a 6a 6a 6a 6a 6a 6a 6a 6a |jjjjjjjjj | +--------+-------------------------------------------------+----------------+ 14:37:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x744f2b47, L:/192.168.0.103:9090 - R:/192.168.0.103:49979] READ COMPLETE
解码器 netty中解决粘包半包:Netty提供了针对封装成帧这种形式下不同方式的拆包器,所谓的拆包其实就是数据的解码,所谓解码就是将网络中的一些原始数据解码成上层应用的数据,那对应在发送数据的时候要按照同样的方式进行数据的编码操作然后发送到网络中
分隔符解码器-LineBasedFrameDecoder
分隔符解码器-DelimiterBasedFrameDecoder
基于长度的域解码器-LengthFieldBasedFrameDecoder
LengthFieldPrepender
其他编解码器
二次编解码 Netty codec 二次编解码
我们把解决半包粘包问题的常用三种解码器叫一次解码器,其作用是将原始数据流(可能会出现粘包和半包的数据流) 转换为用户数据(ByteBuf中存储),但仍然是字节数据,所以我们需要二次解码器将字节数组转换为java对象,或者 将将一种格式转化为另一种格式,方便上层应用程序使用。
一次解码器继承自:ByteToMessageDecoder;二次解码器继承自:MessageToMessageDecoder;但他们的本质 都是继承ChannelInboundHandlerAdapter
用户数据(ByteBuf )和 Java Object之间的转换,或者将将一种格式转化为另一种格式(譬如将应用数据转化成某种 协议数据)。
Java 序列化:不推荐使用,占用空间大,也只有java语言能用
Marshaling:比java序列化稍好
XML :可读性好,但是占用空间大
JSON :可读性也好,空间较小
MessagePack :占用空间比JSON小,可读性不如JSON,但也还行
Protobuf :性能高,体积小,但是可读性差
hessian : 跨语言、高效的二进制序列化协议,整体性能和protobuf差不多。
其他
Protostuff编解码 protobuf 使用:
github
HTTP 编解码 HTTP服务器在我们日常开发中,常见的实现方式就是实现一个Java Web项目,基于Nginx+Tomcat的方式就可以提供HTTP服务。但是很多场景是非Web容器的场景,这个时候再使用Tomcat就大材小用了。这个时候就可以使用基于Netty的HTTP协议。而且基于Netty开发的HTTP服务器有如下优势:
Netty的线程模型和异步非阻塞特性能够支持高并发
相比于Tomcat HTTP,Netty HTTP更加轻量、小巧、可靠,占用资源更少
协议设计与解析 为什么需要协议? TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
另一种解读
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
小故事
很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。
年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”
私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”
双方唇枪舌战,你来我往,真个是不亦乐乎!
这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬
redis 协议举例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 NioEventLoopGroup worker = new NioEventLoopGroup ();byte [] LINE = {13 , 10 }; try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) { set(ctx); get(ctx); } private void get (ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*2" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("get" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa" .getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } private void set (ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("set" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3" .getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbb" .getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost" , 6379 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); }
http 协议举例
http协议主要使用CRLF进行分割
请求body里面主要是Post提交的数据(可支持多种格式,格式在Content-Type定义,长度是在Content-Length里面定义)
HTTP 协议抽象-Content-Length,chunked
chunked
HTTP协议通常使用Content-Length来标识body的长度,在服务器端,需要先申请对应长度的buffer,然后再赋值。如果需要一边生产数据一边发送数据,就需要使用”Transfer-Encoding: chunked” 来代替Content-Length,也就是对数据进行分块传输。
Content-Length:http server接收数据时,发现header中有Content-Length属性,则读取Content-Length 的值,确定需要读取body的长度;http server发送数据时,根据需要发送byte的长度,在header中增加 Content-Length 项,其中value为byte的长度,然后将byte数据当做body发送到客户端
chunked:http server接收数据时,发现header中有Transfer-Encoding: chunked,则会按照truncked协议分批读取数据;httpserver发送数据时,如果需要分批发送到客户端,则需要在header中加上 Transfer-Encoding: chunked,然后按照truncked协议分批发送数据
HTTP 协议抽象-响应压缩
数据压缩
开启数据的无损压缩,节省传输的流量,提升数据的加载性能,压缩需要客户端,服务器端同时支持。在chrome中,请求默认会加上Accept-Encoding: gzip, deflate,客户端默认开启数据压缩。
在请求时,需要通过header的Accept-Encoding: gzip, deflate 来告诉服务器客户端支持的压缩类型
在返回时,http server会在返回的header中添加Content-Encoding: gzip 来告诉客户端数据的压缩方式
压缩类型主要包含如下几种:
gzip :说明body采用GNU zip编码
compress :说明body采用Unix的文件压缩程序
deflate :说明body是用zlib的格式压缩的
identity :说明没有对实体进行编码。其中 gzip, compress, 以及deflate编码都是无损压缩算法,不会导致信息损失。 gzip效率最高,使用较为广泛
Netty HTTP 协议抽象的实现-请求和响应
QueryStringDecoder : 主要是对url进行封装,解析path和url上面的参数。(Tips:在tomcat中如果提交的post请求是application/x-www- form-urlencoded,则getParameter获取的是包含url后面和body里面所有的参数,而在netty中,获取的仅仅是url上面的参数)
HttpContent :是对body进行封装,本质上就是一个ByteBuf。如果ByteBuf的长度是固定的,则请求的body过大,可能包含多个HttpContent ,其中最后一个为LastHttpContent(空的HttpContent),用来说明body的结束
response对象的抽象比较类似
HttpRequestDecoder & HttpObjectAggregator & HttpResponseEncoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 NioEventLoopGroup boss = new NioEventLoopGroup ();NioEventLoopGroup worker = new NioEventLoopGroup ();try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec ()); ch.pipeline().addLast(new SimpleChannelInboundHandler <HttpRequest>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpRequest msg) throws Exception { log.debug(msg.uri()); DefaultFullHttpResponse response = new DefaultFullHttpResponse (msg.protocolVersion(), HttpResponseStatus.OK); byte [] bytes = "<h1>Hello, world!</h1>" .getBytes(); response.headers().setInt(CONTENT_LENGTH, bytes.length); response.content().writeBytes(bytes); ctx.writeAndFlush(response); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }
Keepalive 与 idle监测 客户端和服务端建立好连接后,长时间没有发送数据,或者网络出问题了,或者应用出问题了。大量的时候可能造成阻塞拥堵。所以要及时检测到。
tcp自己的keepalive,2个小时,太长了,更注重于网络层的是否可用。
应用层 Keepalvie
除了在tcp网络层开启keepalive之外,我们普遍还需要在应用层启动keepalive,一般称之为:应用心跳(心跳机制 ),原因如下:
1、协议分层,各层关注点不同,网络传输层关注网络是否可达,应用层关注是否能正常提供服务
2、tcp的keepalive默认关闭,并且经过路由等中转设备后keepalive包有可能被丢弃
3、tcp层的keepalive时间太长,默认>
2小时,虽然可改,但是属于系统参数一旦改动影响该机器上的所有应用
另外需要注意:http虽然属于应用层协议,因此会经常听到 HTTP 的头信息:Connection: Keep-Alive,HTTP/1.1
默认使用Connection:keep-alive
进行长连接。在一次 TCP 连接中可以完成多个 HTTP 请求,但是对每个请求仍然 要单独发 header,Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中 设定这个时间。这种长连接是一种“伪链接”,而且只能由客户端发送请求,服务端响应。 HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接
Idle 监测,只是负责诊断,诊断后,做出不同的行为,决定Idle 监测的最终用途,一般用来配合keepalive ,减少keepalive 消息
idle监测
idle配合keepalive的发展阶段
v1:定时keepalive 消息,keepalive 消息与服务器正常消息交换完全不关联,定时就发送;
v2:空闲监测+ 判定为Idle 时才发keepalive,有其他数据传输的时候,不发送keepalive ,无数据传输超过一定 时间,判定为Idle,再发keepalive
idle带来的好处
快速释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态
实际应用中:结合起来使用。按需keepalive ,保证不会空闲,如果空闲,关闭连接
idle会用场景
服务器添加read idle check,10s接收不到channel数据就断掉连接,保护自己,瘦身
客户端添加write idle check + keepalive,5s不发送数据就发送一个keepalive,避免连接被断,也避免频繁 keepalive
自定义协议要素
魔数,用来在第一时间判定是否是无效数据包
版本号,可以支持协议的升级
序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
指令类型,是登录、注册、单聊、群聊… 跟业务相关
请求序号,为了双工通信,提供异步能力
正文长度
消息正文
编解码器 根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Slf4j public class MessageCodec extends ByteToMessageCodec <Message> { @Override protected void encode (ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(0 ); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(msg); byte [] bytes = bos.toByteArray(); out.writeInt(bytes.length); out.writeBytes(bytes); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerType = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bytes)); Message message = (Message) ois.readObject(); log.debug("{}, {}, {}, {}, {}, {}" , magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}" , message); out.add(message); } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 EmbeddedChannel channel = new EmbeddedChannel ( new LoggingHandler (), new LengthFieldBasedFrameDecoder ( 1024 , 12 , 4 , 0 , 0 ), new MessageCodec () ); LoginRequestMessage message = new LoginRequestMessage ("zhangsan" , "123" , "张三" );ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec ().encode(null , message, buf);ByteBuf s1 = buf.slice(0 , 100 );ByteBuf s2 = buf.slice(100 , buf.readableBytes() - 100 );s1.retain(); channel.writeInbound(s1); channel.writeInbound(s2);
解读
什么时候可以加 @Sharable💡
当 handler 不保存状态时,就可以安全地在多线程下被共享
但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Slf4j @ChannelHandler .Sharablepublic class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override protected void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(0 ); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(msg); byte [] bytes = bos.toByteArray(); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerType = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bytes)); Message message = (Message) ois.readObject(); log.debug("{}, {}, {}, {}, {}, {}" , magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}" , message); out.add(message); } }
聊天室案例 聊天室业务介绍 1 2 3 4 5 6 7 8 9 10 11 12 13 public interface UserService { boolean login (String username, String password) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public interface Session { void bind (Channel channel, String username) ; void unbind (Channel channel) ; Object getAttribute (Channel channel, String name) ; void setAttribute (Channel channel, String name, Object value) ; Channel getChannel (String username) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public interface GroupSession { Group createGroup (String name, Set<String> members) ; Group joinMember (String name, String member) ; Group removeMember (String name, String member) ; Group removeGroup (String name) ; Set<String> getMembers (String name) ; List<Channel> getMembersChannel (String name) ; }
聊天室业务-登录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class ChatServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(new SimpleChannelInboundHandler <LoginRequestMessage>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception { String username = msg.getUsername(); String password = msg.getPassword(); boolean login = UserServiceFactory.getUserService().login(username, password); LoginResponseMessage message; if (login) { message = new LoginResponseMessage (true , "登录成功" ); } else { message = new LoginResponseMessage (false , "用户名或密码不正确" ); } ctx.writeAndFlush(message); } }); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 @Slf4j public class ChatClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch (1 ); AtomicBoolean LOGIN = new AtomicBoolean (false ); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast("client handler" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("msg: {}" , msg); if ((msg instanceof LoginResponseMessage)) { LoginResponseMessage response = (LoginResponseMessage) msg; if (response.isSuccess()) { LOGIN.set(true ); } WAIT_FOR_LOGIN.countDown(); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { new Thread (() -> { Scanner scanner = new Scanner (System.in); System.out.println("请输入用户名:" ); String username = scanner.nextLine(); System.out.println("请输入密码:" ); String password = scanner.nextLine(); LoginRequestMessage message = new LoginRequestMessage (username, password); ctx.writeAndFlush(message); System.out.println("等待后续操作..." ); try { WAIT_FOR_LOGIN.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (!LOGIN.get()) { ctx.channel().close(); return ; } while (true ) { System.out.println("==================================" ); System.out.println("send [username] [content]" ); System.out.println("gsend [group name] [content]" ); System.out.println("gcreate [group name] [m1,m2,m3...]" ); System.out.println("gmembers [group name]" ); System.out.println("gjoin [group name]" ); System.out.println("gquit [group name]" ); System.out.println("quit" ); System.out.println("==================================" ); String command = scanner.nextLine(); String[] s = command.split(" " ); switch (s[0 ]){ case "send" : ctx.writeAndFlush(new ChatRequestMessage (username, s[1 ], s[2 ])); break ; case "gsend" : ctx.writeAndFlush(new GroupChatRequestMessage (username, s[1 ], s[2 ])); break ; case "gcreate" : Set<String> set = new HashSet <>(Arrays.asList(s[2 ].split("," ))); set.add(username); ctx.writeAndFlush(new GroupCreateRequestMessage (s[1 ], set)); break ; case "gmembers" : ctx.writeAndFlush(new GroupMembersRequestMessage (s[1 ])); break ; case "gjoin" : ctx.writeAndFlush(new GroupJoinRequestMessage (username, s[1 ])); break ; case "gquit" : ctx.writeAndFlush(new GroupQuitRequestMessage (username, s[1 ])); break ; case "quit" : ctx.channel().close(); return ; } } }, "system in" ).start(); } }); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
聊天室业务-单聊 服务器端将 handler 独立出来
登录 handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @ChannelHandler .Sharablepublic class LoginRequestMessageHandler extends SimpleChannelInboundHandler <LoginRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception { String username = msg.getUsername(); String password = msg.getPassword(); boolean login = UserServiceFactory.getUserService().login(username, password); LoginResponseMessage message; if (login) { SessionFactory.getSession().bind(ctx.channel(), username); message = new LoginResponseMessage (true , "登录成功" ); } else { message = new LoginResponseMessage (false , "用户名或密码不正确" ); } ctx.writeAndFlush(message); } }
单聊 handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @ChannelHandler .Sharablepublic class ChatRequestMessageHandler extends SimpleChannelInboundHandler <ChatRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception { String to = msg.getTo(); Channel channel = SessionFactory.getSession().getChannel(to); if (channel != null ) { channel.writeAndFlush(new ChatResponseMessage (msg.getFrom(), msg.getContent())); } else { ctx.writeAndFlush(new ChatResponseMessage (false , "对方用户不存在或者不在线" )); } } }
聊天室业务-群聊 创建群聊
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @ChannelHandler .Sharablepublic class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler <GroupCreateRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception { String groupName = msg.getGroupName(); Set<String> members = msg.getMembers(); GroupSession groupSession = GroupSessionFactory.getGroupSession(); Group group = groupSession.createGroup(groupName, members); if (group == null ) { ctx.writeAndFlush(new GroupCreateResponseMessage (true , groupName + "创建成功" )); List<Channel> channels = groupSession.getMembersChannel(groupName); for (Channel channel : channels) { channel.writeAndFlush(new GroupCreateResponseMessage (true , "您已被拉入" + groupName)); } } else { ctx.writeAndFlush(new GroupCreateResponseMessage (false , groupName + "已经存在" )); } } }
群聊
1 2 3 4 5 6 7 8 9 10 11 12 @ChannelHandler .Sharablepublic class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler <GroupChatRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception { List<Channel> channels = GroupSessionFactory.getGroupSession() .getMembersChannel(msg.getGroupName()); for (Channel channel : channels) { channel.writeAndFlush(new GroupChatResponseMessage (msg.getFrom(), msg.getContent())); } } }
加入群聊
1 2 3 4 5 6 7 8 9 10 11 12 @ChannelHandler .Sharablepublic class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler <GroupJoinRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception { Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername()); if (group != null ) { ctx.writeAndFlush(new GroupJoinResponseMessage (true , msg.getGroupName() + "群加入成功" )); } else { ctx.writeAndFlush(new GroupJoinResponseMessage (true , msg.getGroupName() + "群不存在" )); } } }
退出群聊
1 2 3 4 5 6 7 8 9 10 11 12 @ChannelHandler .Sharablepublic class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler <GroupQuitRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception { Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername()); if (group != null ) { ctx.writeAndFlush(new GroupJoinResponseMessage (true , "已退出群" + msg.getGroupName())); } else { ctx.writeAndFlush(new GroupJoinResponseMessage (true , msg.getGroupName() + "群不存在" )); } } }
查看成员
1 2 3 4 5 6 7 8 9 @ChannelHandler .Sharablepublic class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler <GroupMembersRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception { Set<String> members = GroupSessionFactory.getGroupSession() .getMembers(msg.getGroupName()); ctx.writeAndFlush(new GroupMembersResponseMessage (members)); } }
聊天室业务-退出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j @ChannelHandler.Sharable public class QuitHandler extends ChannelInboundHandlerAdapter { // 当连接断开时触发 inactive 事件 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); log.debug("{} 已经断开", ctx.channel()); } // 当出现异常时触发 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage()); } }
聊天室业务-空闲检测 连接假死 原因
网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
应用程序线程阻塞,无法进行数据读写
问题
假死的连接占用的资源不能自动释放
向假死的连接发送数据,得到的反馈是发送超时
服务器端解决
怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ch.pipeline().addLast(new IdleStateHandler (5 , 0 , 0 )); ch.pipeline().addLast(new ChannelDuplexHandler () { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception{ IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.debug("已经 5s 没有读到数据了" ); ctx.channel().close(); } } });
客户端定时心跳
客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ch.pipeline().addLast(new IdleStateHandler (0 , 3 , 0 )); ch.pipeline().addLast(new ChannelDuplexHandler () { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception{ IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.WRITER_IDLE) { ctx.writeAndFlush(new PingMessage ()); } } });
Netty优化 优化 扩展序列化算法 序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new byte [bodyLength];byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (body));Message message = (Message) in.readObject();message.setSequenceId(sequenceId); ByteArrayOutputStream out = new ByteArrayOutputStream ();new ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); }
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 enum SerializerAlgorithm implements Serializer { Java { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (bytes)); Object object = in.readObject(); return (T) object; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("SerializerAlgorithm.Java 反序列化错误" , e); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream out = new ByteArrayOutputStream (); new ObjectOutputStream (out).writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException ("SerializerAlgorithm.Java 序列化错误" , e); } } }, Json { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { return new Gson ().fromJson(new String (bytes, StandardCharsets.UTF_8), clazz); } @Override public <T> byte [] serialize(T object) { return new Gson ().toJson(object).getBytes(StandardCharsets.UTF_8); } }; public static SerializerAlgorithm getByInt (int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1 ) { throw new IllegalArgumentException ("超过 SerializerAlgorithm 范围" ); } return array[type]; } }
增加配置类和配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError (e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
配置文件
1 serializer.algorithm =Json
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override public void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(Config.getSerializerAlgorithm().ordinal()); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); byte [] bytes = Config.getSerializerAlgorithm().serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; Class<? extends Message > messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); out.add(message); } }
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Data public abstract class Message implements Serializable { public static Class<? extends Message > getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType () ; public static final int LoginRequestMessage = 0 ; public static final int LoginResponseMessage = 1 ; public static final int ChatRequestMessage = 2 ; public static final int ChatResponseMessage = 3 ; public static final int GroupCreateRequestMessage = 4 ; public static final int GroupCreateResponseMessage = 5 ; public static final int GroupJoinRequestMessage = 6 ; public static final int GroupJoinResponseMessage = 7 ; public static final int GroupQuitRequestMessage = 8 ; public static final int GroupQuitResponseMessage = 9 ; public static final int GroupChatRequestMessage = 10 ; public static final int GroupChatResponseMessage = 11 ; public static final int GroupMembersRequestMessage = 12 ; public static final int GroupMembersResponseMessage = 13 ; public static final int PingMessage = 14 ; public static final int PongMessage = 15 ; private static final Map<Integer, Class<? extends Message >> messageClasses = new HashMap <>(); static { messageClasses.put(LoginRequestMessage, LoginRequestMessage.class); messageClasses.put(LoginResponseMessage, LoginResponseMessage.class); messageClasses.put(ChatRequestMessage, ChatRequestMessage.class); messageClasses.put(ChatResponseMessage, ChatResponseMessage.class); messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class); messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class); messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class); messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class); messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class); messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class); messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class); messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class); messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class); messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class); } }
参数调优 System 参数 linux系统参数/Netty支持的系统参数
linux系统参数,例如:/proc/sys/net/ipv4/tcp_keepalive_time
netty支持的系统参数设置,例如:serverbootstrap.option(ChannelOption.SO_BACKLOG,1024),且设置形式有两种:
1、针对ServerSocketChannel:通过.option设置
2、针对SocketChannel:通过.childOption设置
Linux参数:
进行tcp连接时,系统为每个tcp连接都会创建一个socket句柄,其实就是一个文件句柄(linux一切皆为文件),但是系统对于每个进程能够打开的文件句柄数量
做了限制,超出则报错:Too many open file设置方式:有很多种,ulimit -n [xxx]
注意:该命令修改的数值,只对当前登录用户目前使用的环境有效,系统重启或用户退出后失效,所以建议的做法是可以作为启动脚本的一部分,在启动程序前执行
Netty支持的 System 参数 Netty如何设置?
针对ScoketChannel,7个,通过.childOption设置,常用的两个如下:
1、SO_KEEPALIVE ,tcp层keepalvie,默认关闭,一般选择关闭tcp keepalive 而使用应用keepalive
2、TCP_NODELAY :设置是否启用nagle算法,该算法是tcp在发送数据时将小的、碎片化的数据拼接成一个大的报文一起发送,以此来提高效率,默认是false(启用),如果启用可能会导致有些数据有延时,如果业务不能忍受,小报文也需要立即发送则可以禁用该算法
针对ServerScoketChannel,3个,通过.Option设置,常用的一个如下:
1、SO_BACKLOG :最大等待连接数量,netty在linux下该值的获取是通过:io.netty.util.NetUtil完成的
1.1、先尝试获取:/proc/sys/net/core/somaxconn
1.2、然后尝试:sysctl
1.3、最终没有获取到使用默认值:PlatformDependent.isWindows() ? 200 : 128
;
应用诊断 完善线程名
添加Handler名称 & 日志 netty 自己内部有日志选择逻辑
线程模型优化 这里说的就是1+m+n里的n,一般是业务拆出去
零拷贝 操作系统中的零拷贝
系统内核处理 IO 操作分为两个阶段:等待数据和拷贝数据:
1、等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中。
2、拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。
操作系统中的零拷贝
在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率
明显的图中有两步是多余的数据拷贝,通过java的FileChannel.transferTo方法(底层基于NIO),可以避免上面两次多余的拷贝(当然这需要底层操作系统的支持)
Netty中的零拷贝
Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的,它的 Zero-copy 的更多的是偏向于 优化数据操作 这样的概念,Netty的Zero-copy主要体现在如下几个方面:
1、Direct Buffer: 直接堆外内存区域分配空间而不是在堆内存中分配, 如果使用传统的堆内存分配,当我们需要将数据通过socket发送的时候,需要将数据从堆内存拷贝到堆外直接内存,然后再由直接内存拷贝到网卡接口层,通过Netty提供的DirectBuffers直接将数据分配到堆外内存,避免多余的数据拷贝
2、 Composite Buffers:传统的ByteBuffer,如果需要将两个ByteBuffer中的数据组合到一起,我们需要首先创建一个size=size1+size2大小的新的数组,然后将两个数组中的数据拷贝到新的数组中。但是使用Netty提供的组合ByteBuf,就可以避免这样的操作,因为CompositeByteBuf并没有真正将多个Buffer组合起来,而是保存了它们的引用,从而避免了数据的拷贝,实现了零拷贝;同时也支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝。
3、通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作
4、通过 FileRegion 包装的FileChannel.tranferTo (Java nio)实现文件传输, 可以直接将文件缓冲区的数据发送到目标Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题
1)CONNECT_TIMEOUT_MILLIS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class TestConnectionTimeout { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap () .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler ()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.debug("timeout" ); } finally { group.shutdownGracefully(); } } }
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable () { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException ("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } }
2)SO_BACKLOG
属于 ServerSocketChannal 参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private volatile int backlog = NetUtil.SOMAXCONN; }
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
1 2 3 4 5 6 7 8 public class Server { public static void main (String[] args) throws IOException { ServerSocket ss = new ServerSocket (8888 , 2 ); Socket accept = ss.accept(); System.out.println(accept); System.in.read(); } }
客户端启动 4 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Client { public static void main (String[] args) throws IOException { try { Socket s = new Socket (); System.out.println(new Date ()+" connecting..." ); s.connect(new InetSocketAddress ("localhost" , 8888 ),1000 ); System.out.println(new Date ()+" connected..." ); s.getOutputStream().write(1 ); System.in.read(); } catch (IOException e) { System.out.println(new Date ()+" connecting timeout..." ); e.printStackTrace(); } } }
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
1 2 Tue Apr 21 20 :30 :28 CST 2020 connecting... Tue Apr 21 20 :30 :28 CST 2020 connected...
第 4 个客户端连接时
1 2 3 Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out
3)ulimit -n
4)TCP_NODELAY
5)SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
6)ALLOCATOR
属于 SocketChannal 参数
用来分配 ByteBuf, ctx.alloc()
7)RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
RPC 框架 1)准备工作 这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public abstract class Message implements Serializable { public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } }
服务器架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
客户端架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
服务器端的 service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap <>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service" )) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError (e); } } public static <T> T getService (Class<T> interfaceClass) { return (T) map.get(interfaceClass); } }
相关配置 application.properties
1 2 serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
2)服务器 handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message) { RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(message.getSequenceId()); try { HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(e); } ctx.writeAndFlush(response); } }
3)客户端代码第一版 只发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage ( 1 , "cn.itcast.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"张三" } )).addListener(promise -> { if (!promise.isSuccess()) { Throwable cause = promise.cause(); log.error("error" , cause); } }); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
4)客户端 handler 第一版 1 2 3 4 5 6 7 8 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); } }
5)客户端代码 第二版 包括 channel 管理,代理,接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @Slf4j public class RpcClientManager { public static void main (String[] args) { HelloService service = getProxyService(HelloService.class); System.out.println(service.sayHello("zhangsan" )); } public static <T> T getProxyService (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage ( sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } }); return (T) o; } private static Channel channel = null ; private static final Object LOCK = new Object (); public static Channel getChannel () { if (channel != null ) { return channel; } synchronized (LOCK) { if (channel != null ) { return channel; } initChannel(); return channel; } } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error" , e); } } }
6)客户端 handler 第二版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap <>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null ) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } }
Netty源码 源码分析 启动剖析 我们就来看看 netty 中对下面的代码是怎样进行处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector selector = Selector.open(); NioServerSocketChannel attachment = new NioServerSocketChannel ();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey selectionKey = serverSocketChannel.register(selector, 0 , attachment);serverSocketChannel.bind(new InetSocketAddress (8080 )); selectionKey.interestOps(SelectionKey.OP_ACCEPT);
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { return new DefaultChannelPromise (new FailedChannel (), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { } return regFuture; }
关键代码 io.netty.bootstrap.ServerBootstrap#init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0 )); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0 )); } p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
关键代码 io.netty.channel.ChannelInitializer#initChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } return true ; } return false ; }
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable () { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 2 3 4 5 6 7 protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
NioEventLoop 剖析 NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void execute (Runnable task) { if (task == null ) { throw new NullPointerException ("task" ); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1 2 3 4 5 6 @Override protected void wakeup (boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false , true )) { selector.wakeup(); } }
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doStartThread () { assert thread == null ; executor.execute(new Runnable () { @Override public void run () { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: " , t); } finally { } } }); }
io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 protected void run () { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: boolean oldWakenUp = wakenUp.getAndSet(false ); select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } default : } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue ; } cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return ; } } } catch (Throwable t) { handleLoopException(t); } } }
⚠️ 注意
这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作
参考下图
io.netty.channel.nio.NioEventLoop#select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private void select (boolean oldWakenUp) throws IOException { Selector selector = this .selector; try { int selectCnt = 0 ; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ; if (timeoutMillis <= 0 ) { if (selectCnt == 0 ) { selector.selectNow(); selectCnt = 1 ; } break ; } if (hasTasks() && wakenUp.compareAndSet(false , true )) { selector.selectNow(); selectCnt = 1 ; break ; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break ; } if (Thread.interrupted()) { selectCnt = 1 ; break ; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1 ; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { selector = selectRebuildSelector(selectCnt); selectCnt = 1 ; break ; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { } } catch (CancelledKeyException e) { } }
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
1 2 3 4 5 6 7 8 9 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
io.netty.channel.nio.NioEventLoop#processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { return ; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
accept 剖析 nio 中如下代码,在 netty 中的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false ); channel.register(selector, SelectionKey.OP_READ); } }
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void read () { assert eventLoop () .inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null ) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true ; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
read 剖析 再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public final void read () { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return ; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null ; boolean close = false ; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0 ) { byteBuf.release(); byteBuf = null ; close = allocHandle.lastBytesRead() < 0 ; if (close) { readPending = false ; } break ; } allocHandle.incMessagesRead(1 ); readPending = false ; pipeline.fireChannelRead(byteBuf); byteBuf = null ; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1 2 3 4 5 6 7 8 9 10 11 12 public boolean continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0 ; }
附录 官网
源码
博客 芋道源码 netty
书籍
视频