BIO、NIO、AIO系列二:Netty

xiaoxiao2021-03-01  25

一、概述

Netty是一个Java的开源框架。提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

Netty是一个NIO客户端,服务端框架。允许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议,它简化了网络编程规范。

二、NIO开发的问题

1、NIO类库和API复杂,使用麻烦。

2、需要具备Java多线程编程能力(涉及到Reactor模式)。

3、客户端断线重连、网络不稳定、半包读写、失败缓存、网络阻塞和异常码流等问题处理难度非常大

4、存在部分BUG

NIO进行服务器开发的步骤很复杂有以下步骤:

1、创建ServerSocketChannel,配置为非阻塞模式;

2、绑定监听,配置TCP参数;

3、创建一个独立的IO线程,用于轮询多路复用器Selector;

4、创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听Accept事件;

5、启动IO线程,在循环中执行Select.select()方法,轮询就绪的Channel;

6、当轮询到处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明有新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;

7、设置新接入的客户端链路SocketChannel为非阻塞模式,配置TCP参数;

8、将SocketChannel注册到Selector上,监听READ事件;

9、如果轮询的Channel为OP_READ,则说明SocketChannel中有新的准备就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;

10、如果轮询的Channel为OP_WRITE,则说明还有数据没有发送完成,需要继续发送。

三、Netty的优点

1、API使用简单,开发门槛低;

2、功能强大,预置了多种编解码功能,支持多种主流协议;

3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展;

4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优;

5、成熟、稳定,Netty修复了已经发现的NIO所有BUG;

6、社区活跃;

7、经历了很多商用项目的考验。

四、Netty使用demo示例

服务端TimeServer.java

1 package com.studyio.netty; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 /** 13 * 14 * @author lgs 15 * 服务端 16 */ 17 public class TimeServer { 18 19 public static void main(String[] args) throws Exception { 20 int port=8080; //服务端默认端口 21 new TimeServer().bind(port); 22 } 23 24 public void bind(int port) throws Exception{ 25 //1用于服务端接受客户端的连接 线程池里面只有一个线程 26 EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); 27 //2用于进行SocketChannel的网络读写 线程池有多个线程 28 EventLoopGroup workerGroup = new NioEventLoopGroup(); 29 try { 30 //Netty用于启动NIO服务器的辅助启动类 31 ServerBootstrap sb = new ServerBootstrap(); 32 //将两个NIO线程组传入辅助启动类中 33 sb.group(acceptorGroup, workerGroup) 34 //设置创建的Channel为NioServerSocketChannel类型 35 .channel(NioServerSocketChannel.class) 36 //配置NioServerSocketChannel的TCP参数 37 .option(ChannelOption.SO_BACKLOG, 1024) 38 //设置绑定IO事件的处理类 39 .childHandler(new ChannelInitializer<SocketChannel>() { 40 //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 41 @Override 42 protected void initChannel(SocketChannel arg0) throws Exception { 43 44 arg0.pipeline().addLast(new TimeServerHandler()); 45 } 46 }); 47 //绑定端口,同步等待成功(sync():同步阻塞方法,等待bind操作完成才继续) 48 //ChannelFuture主要用于异步操作的通知回调 49 ChannelFuture cf = sb.bind(port).sync(); 50 System.out.println("服务端启动在8080端口。"); 51 //等待服务端监听端口关闭 52 cf.channel().closeFuture().sync(); 53 } finally { 54 //优雅退出,释放线程池资源 55 acceptorGroup.shutdownGracefully(); 56 workerGroup.shutdownGracefully(); 57 } 58 } 59 }

服务端用于对网络资源进行读写操作TimeServerHandler.java

1 package com.studyio.netty; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * @readme 服务端用于对网络资源进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。 14 * 15 */ 16 public class TimeServerHandler extends ChannelHandlerAdapter { 17 18 @Override 19 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 20 ByteBuf buf = (ByteBuf) msg; 21 //buf.readableBytes():获取缓冲区中可读的字节数; 22 //根据可读字节数创建数组 23 byte[] req = new byte[buf.readableBytes()]; 24 buf.readBytes(req); 25 String body = new String(req, "UTF-8"); 26 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body); 27 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 28 29 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 30 //将待发送的消息放到发送缓存数组中 31 ctx.writeAndFlush(resp); 32 } 33 }

 

客户端TimeClient.java

1 package com.studyio.netty; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 12 /** 13 * 14 * @author lgs 15 * 客户端 16 */ 17 public class TimeClient { 18 public static void main(String[] args) throws Exception { 19 int port=8080; //服务端默认端口 20 new TimeClient().connect(port, "127.0.0.1"); 21 } 22 public void connect(int port, String host) throws Exception{ 23 //配置客户端NIO线程组 24 EventLoopGroup group = new NioEventLoopGroup(); 25 try { 26 Bootstrap bs = new Bootstrap(); 27 bs.group(group) 28 .channel(NioSocketChannel.class) 29 .option(ChannelOption.TCP_NODELAY, true) 30 .handler(new ChannelInitializer<SocketChannel>() { 31 @Override 32 //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 33 protected void initChannel(SocketChannel arg0) throws Exception { 34 arg0.pipeline().addLast(new TimeClientHandler()); 35 } 36 }); 37 //发起异步连接操作 38 ChannelFuture cf = bs.connect(host, port).sync(); 39 //等待客户端链路关闭 40 cf.channel().closeFuture().sync(); 41 } finally { 42 //优雅退出,释放NIO线程组 43 group.shutdownGracefully(); 44 } 45 } 46 }

客户端向服务端发送数据和接收服务端数据TimeClientHandler.java

1 package com.studyio.netty; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 客户端向服务端发送数据和接收服务端数据 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 @Override 16 //向服务器发送指令 17 public void channelActive(ChannelHandlerContext ctx) throws Exception { 18 for (int i = 0; i < 5; i++) { 19 byte[] req = "QUERY TIME ORDER".getBytes(); 20 ByteBuf firstMessage = Unpooled.buffer(req.length); 21 firstMessage.writeBytes(req); 22 ctx.writeAndFlush(firstMessage); 23 } 24 } 25 26 @Override 27 //接收服务器的响应 28 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 29 ByteBuf buf = (ByteBuf) msg; 30 //buf.readableBytes():获取缓冲区中可读的字节数; 31 //根据可读字节数创建数组 32 byte[] req = new byte[buf.readableBytes()]; 33 buf.readBytes(req); 34 String body = new String(req, "UTF-8"); 35 System.out.println("Now is : "+body); 36 } 37 38 @Override 39 //异常处理 40 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 41 //释放资源 42 ctx.close(); 43 } 44 45 }

五、粘包/拆包问题

TCP粘包拆包问题示例图:

说明:

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,可能存在以下4种情况。

1、服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

2、服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;

3、服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余部分内容,这被称为TCP拆包;

4、服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_1和D2包的完整内容;

如果此时服务器TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能发生第五种情况,既服务端分多次才能将D1和D2包接收完全,期间发生多次拆包;

总结:

粘包:客户端发送的数据D2,D1可能被合并成一个D2D1发送到服务端

拆包:客户端发送的数据D2,D1可能被拆分成D2_1,D2_2D1或者D2D1_1,D1_2发送到服务端

粘包拆包问题的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案可归纳如下:

1、消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;

FixedLengthFrameDecoder

是固定长度解码器,能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题。

服务端:

TimeServer.java

1 package com.studyio.nettyFixedLength; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.FixedLengthFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 处理粘包/拆包问题-消息定长,固定长度处理 18 */ 19 public class TimeServer { 20 21 public static void main(String[] args) throws Exception { 22 int port=8080; //服务端默认端口 23 new TimeServer().bind(port); 24 } 25 public void bind(int port) throws Exception{ 26 //Reactor线程组 27 //1用于服务端接受客户端的连接 28 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 29 //2用于进行SocketChannel的网络读写 30 EventLoopGroup workerGroup = new NioEventLoopGroup(); 31 try { 32 //Netty用于启动NIO服务器的辅助启动类 33 ServerBootstrap sb = new ServerBootstrap(); 34 //将两个NIO线程组传入辅助启动类中 35 sb.group(acceptorGroup, workerGroup) 36 //设置创建的Channel为NioServerSocketChannel类型 37 .channel(NioServerSocketChannel.class) 38 //配置NioServerSocketChannel的TCP参数 39 .option(ChannelOption.SO_BACKLOG, 1024) 40 //设置绑定IO事件的处理类 41 .childHandler(new ChannelInitializer<SocketChannel>() { 42 @Override 43 protected void initChannel(SocketChannel arg0) throws Exception { 44 //处理粘包/拆包问题-固定长度处理 45 arg0.pipeline().addLast(new FixedLengthFrameDecoder(16)); 46 arg0.pipeline().addLast(new StringDecoder()); 47 arg0.pipeline().addLast(new TimeServerHandler()); 48 } 49 }); 50 //绑定端口,同步等待成功(sync():同步阻塞方法) 51 //ChannelFuture主要用于异步操作的通知回调 52 ChannelFuture cf = sb.bind(port).sync(); 53 54 //等待服务端监听端口关闭 55 cf.channel().closeFuture().sync(); 56 } finally { 57 //优雅退出,释放线程池资源 58 acceptorGroup.shutdownGracefully(); 59 workerGroup.shutdownGracefully(); 60 } 61 } 62 }

TimeServerHandler.java

1 package com.studyio.nettyFixedLength; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * @readme 用于对网络时间进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。 12 * 13 */ 14 public class TimeServerHandler extends ChannelHandlerAdapter { 15 16 private int counter; 17 @Override 18 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 19 String body = (String) msg; 20 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 21 String currentTime = body; 22 23 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 24 25 //将待发送的消息放到发送缓存数组中 26 ctx.writeAndFlush(resp); 27 } 28 }

 

客户端:

TimeClient.java

1 package com.studyio.nettyFixedLength; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.codec.FixedLengthFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 处理粘包/拆包问题-消息定长,固定长度处理 18 */ 19 public class TimeClient { 20 public static void main(String[] args) throws Exception { 21 int port=8080; //服务端默认端口 22 new TimeClient().connect(port, "127.0.0.1"); 23 } 24 public void connect(int port, String host) throws Exception{ 25 //配置客户端NIO线程组 26 EventLoopGroup group = new NioEventLoopGroup(); 27 try { 28 Bootstrap bs = new Bootstrap(); 29 bs.group(group) 30 .channel(NioSocketChannel.class) 31 .option(ChannelOption.TCP_NODELAY, true) 32 .handler(new ChannelInitializer<SocketChannel>() { 33 @Override 34 //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 35 protected void initChannel(SocketChannel arg0) throws Exception { 36 //处理粘包/拆包问题-固定长度处理 37 arg0.pipeline().addLast(new FixedLengthFrameDecoder(16)); 38 arg0.pipeline().addLast(new StringDecoder()); 39 40 arg0.pipeline().addLast(new TimeClientHandler()); 41 } 42 }); 43 //发起异步连接操作 44 ChannelFuture cf = bs.connect(host, port).sync(); 45 //等待客户端链路关闭 46 cf.channel().closeFuture().sync(); 47 } finally { 48 //优雅退出,释放NIO线程组 49 group.shutdownGracefully(); 50 } 51 } 52 }

TimeClientHandler.java

1 package com.studyio.nettyFixedLength; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 private int counter; 16 private byte[] req; 17 18 @Override 19 //向服务器发送指令 20 public void channelActive(ChannelHandlerContext ctx) throws Exception { 21 ByteBuf message=null; 22 //模拟一百次请求,发送重复内容 23 for (int i = 0; i < 100; i++) { 24 req = ("QUERY TIME ORDER").getBytes(); 25 message=Unpooled.buffer(req.length); 26 message.writeBytes(req); 27 ctx.writeAndFlush(message); 28 } 29 } 30 31 @Override 32 //接收服务器的响应 33 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 34 String body = (String) msg; 35 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 36 } 37 38 @Override 39 //异常处理 40 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 41 //释放资源 42 ctx.close(); 43 } 44 45 }

2、在包尾增加回车换行符进行分割,例如FTP协议;

2.1 处理粘包/拆包问题-回车换行符进行分割

LineBasedFrameDecoder

服务端:

TimeServer.java

1 package com.studyio.nettyLine; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 处理粘包/拆包问题-回车换行符进行分割 18 * 19 */ 20 public class TimeServer { 21 22 public static void main(String[] args) throws Exception { 23 int port=8080; //服务端默认端口 24 new TimeServer().bind(port); 25 } 26 public void bind(int port) throws Exception{ 27 //Reactor线程组 28 //1用于服务端接受客户端的连接 29 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 30 //2用于进行SocketChannel的网络读写 31 EventLoopGroup workerGroup = new NioEventLoopGroup(); 32 try { 33 //Netty用于启动NIO服务器的辅助启动类 34 ServerBootstrap sb = new ServerBootstrap(); 35 //将两个NIO线程组传入辅助启动类中 36 sb.group(acceptorGroup, workerGroup) 37 //设置创建的Channel为NioServerSocketChannel类型 38 .channel(NioServerSocketChannel.class) 39 //配置NioServerSocketChannel的TCP参数 40 .option(ChannelOption.SO_BACKLOG, 1024) 41 //设置绑定IO事件的处理类 42 .childHandler(new ChannelInitializer<SocketChannel>() { 43 @Override 44 protected void initChannel(SocketChannel arg0) throws Exception { 45 //处理粘包/拆包问题-换行符处理 46 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 47 arg0.pipeline().addLast(new StringDecoder()); 48 49 arg0.pipeline().addLast(new TimeServerHandler()); 50 } 51 }); 52 //绑定端口,同步等待成功(sync():同步阻塞方法) 53 //ChannelFuture主要用于异步操作的通知回调 54 ChannelFuture cf = sb.bind(port).sync(); 55 56 //等待服务端监听端口关闭 57 cf.channel().closeFuture().sync(); 58 } finally { 59 //优雅退出,释放线程池资源 60 acceptorGroup.shutdownGracefully(); 61 workerGroup.shutdownGracefully(); 62 } 63 } 64 }

TimeServerHandler.java

1 package com.studyio.nettyLine; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * @readme 用于对网络时间进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。 14 * 处理粘包/拆包问题-回车换行符进行分割 15 */ 16 public class TimeServerHandler extends ChannelHandlerAdapter { 17 18 private int counter; 19 @Override 20 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 21 // ByteBuf buf = (ByteBuf) msg; 22 // //buf.readableBytes():获取缓冲区中可读的字节数; 23 // //根据可读字节数创建数组 24 // byte[] req = new byte[buf.readableBytes()]; 25 // buf.readBytes(req); 26 // String body = new String(req, "UTF-8"); 27 String body = (String) msg; 28 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 29 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 30 currentTime = currentTime + System.getProperty("line.separator"); 31 32 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 33 //将待发送的消息放到发送缓存数组中 34 ctx.writeAndFlush(resp); 35 } 36 37 }

 

客户端:

TimeClient.java

1 package com.studyio.nettyLine; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 处理粘包/拆包问题-回车换行符进行分割 18 * 19 */ 20 public class TimeClient { 21 public static void main(String[] args) throws Exception { 22 int port=8080; //服务端默认端口 23 new TimeClient().connect(port, "127.0.0.1"); 24 } 25 public void connect(int port, String host) throws Exception{ 26 //配置客户端NIO线程组 27 EventLoopGroup group = new NioEventLoopGroup(); 28 try { 29 Bootstrap bs = new Bootstrap(); 30 bs.group(group) 31 .channel(NioSocketChannel.class) 32 .option(ChannelOption.TCP_NODELAY, true) 33 .handler(new ChannelInitializer<SocketChannel>() { 34 @Override 35 //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 36 protected void initChannel(SocketChannel arg0) throws Exception { 37 //处理粘包/拆包问题-换行符处理 38 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 39 arg0.pipeline().addLast(new StringDecoder()); 40 41 arg0.pipeline().addLast(new TimeClientHandler()); 42 } 43 }); 44 //发起异步连接操作 45 ChannelFuture cf = bs.connect(host, port).sync(); 46 //等待客户端链路关闭 47 cf.channel().closeFuture().sync(); 48 } finally { 49 //优雅退出,释放NIO线程组 50 group.shutdownGracefully(); 51 } 52 } 53 }

TimeClientHandler.java

1 package com.studyio.nettyLine; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 处理粘包/拆包问题-回车换行符进行分割 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 private int counter; 16 private byte[] req; 17 18 @Override 19 //向服务器发送指令 20 public void channelActive(ChannelHandlerContext ctx) throws Exception { 21 ByteBuf message=null; 22 //模拟一百次请求,发送重复内容 23 for (int i = 0; i < 200; i++) { 24 //回车换行符System.getProperty("line.separator") 25 req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); 26 message=Unpooled.buffer(req.length); 27 message.writeBytes(req); 28 ctx.writeAndFlush(message); 29 } 30 31 } 32 33 @Override 34 //接收服务器的响应 35 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 36 // ByteBuf buf = (ByteBuf) msg; 37 // //buf.readableBytes():获取缓冲区中可读的字节数; 38 // //根据可读字节数创建数组 39 // byte[] req = new byte[buf.readableBytes()]; 40 // buf.readBytes(req); 41 // String body = new String(req, "UTF-8"); 42 String body = (String) msg; 43 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 44 } 45 46 @Override 47 //异常处理 48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 49 //释放资源 50 ctx.close(); 51 } 52 53 }

2.2 处理粘包/拆包问题自定义分隔符进行分割

DelimiterBasedFrameDecoder

实现自定义分隔符作为消息的结束标志,完成解码。

服务端:

TimeServer.java

1 package com.studyio.nettyDelimiter; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelFuture; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelOption; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.SocketChannel; 12 import io.netty.channel.socket.nio.NioServerSocketChannel; 13 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 14 import io.netty.handler.codec.string.StringDecoder; 15 16 /** 17 * 18 * @author lgs 19 * 处理粘包/拆包问题-定义分隔符 20 * 21 */ 22 public class TimeServer { 23 public static void main(String[] args) throws Exception { 24 int port=8080; //服务端默认端口 25 new TimeServer().bind(port); 26 } 27 28 public void bind(int port) throws Exception{ 29 //Reactor线程组 30 //1用于服务端接受客户端的连接 31 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 32 //2用于进行SocketChannel的网络读写 33 EventLoopGroup workerGroup = new NioEventLoopGroup(); 34 try { 35 //Netty用于启动NIO服务器的辅助启动类 36 ServerBootstrap sb = new ServerBootstrap(); 37 //将两个NIO线程组传入辅助启动类中 38 sb.group(acceptorGroup, workerGroup) 39 //设置创建的Channel为NioServerSocketChannel类型 40 .channel(NioServerSocketChannel.class) 41 //配置NioServerSocketChannel的TCP参数 42 .option(ChannelOption.SO_BACKLOG, 1024) 43 //设置绑定IO事件的处理类 44 .childHandler(new ChannelInitializer<SocketChannel>() { 45 @Override 46 protected void initChannel(SocketChannel arg0) throws Exception { 47 //处理粘包/拆包问题-自定义分隔符处理 48 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 49 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 50 arg0.pipeline().addLast(new StringDecoder()); 51 arg0.pipeline().addLast(new TimeServerHandler()); 52 } 53 }); 54 //绑定端口,同步等待成功(sync():同步阻塞方法) 55 //ChannelFuture主要用于异步操作的通知回调 56 ChannelFuture cf = sb.bind(port).sync(); 57 58 //等待服务端监听端口关闭 59 cf.channel().closeFuture().sync(); 60 } finally { 61 //优雅退出,释放线程池资源 62 acceptorGroup.shutdownGracefully(); 63 workerGroup.shutdownGracefully(); 64 } 65 } 66 }

TimeServerHandler.java

1 package com.studyio.nettyDelimiter; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * 处理粘包/拆包问题-定义分隔符 14 * @readme 用于对网络时间进行读写操作,通常我们只需要关注channelRead和exceptionCaught方法。 15 * 16 */ 17 public class TimeServerHandler extends ChannelHandlerAdapter { 18 19 private int counter; 20 @Override 21 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 22 String body = (String) msg; 23 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 24 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 25 //处理粘包/拆包问题-定义分隔符 26 currentTime += "$_"; 27 28 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 29 //将待发送的消息放到发送缓存数组中 30 ctx.writeAndFlush(resp); 31 } 32 }

 

客户端:

TimeClient.java

1 package com.studyio.nettyDelimiter; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelFuture; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelOption; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.SocketChannel; 12 import io.netty.channel.socket.nio.NioSocketChannel; 13 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 14 import io.netty.handler.codec.string.StringDecoder; 15 16 /** 17 * 18 * @author lgs 19 * 处理粘包/拆包问题-定义分隔符 20 * 21 */ 22 public class TimeClient { 23 public static void main(String[] args) throws Exception { 24 int port=8080; //服务端默认端口 25 new TimeClient().connect(port, "127.0.0.1"); 26 } 27 public void connect(int port, String host) throws Exception{ 28 //配置客户端NIO线程组 29 EventLoopGroup group = new NioEventLoopGroup(); 30 try { 31 Bootstrap bs = new Bootstrap(); 32 bs.group(group) 33 .channel(NioSocketChannel.class) 34 .option(ChannelOption.TCP_NODELAY, true) 35 .handler(new ChannelInitializer<SocketChannel>() { 36 @Override 37 //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件 38 protected void initChannel(SocketChannel arg0) throws Exception { 39 //处理粘包/拆包问题-自定义分隔符处理 40 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 41 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 42 arg0.pipeline().addLast(new StringDecoder()); 43 44 arg0.pipeline().addLast(new TimeClientHandler()); 45 } 46 }); 47 //发起异步连接操作 48 ChannelFuture cf = bs.connect(host, port).sync(); 49 //等待客户端链路关闭 50 cf.channel().closeFuture().sync(); 51 } finally { 52 //优雅退出,释放NIO线程组 53 group.shutdownGracefully(); 54 } 55 } 56 }

TimeClientHandler.java

1 package com.studyio.nettyDelimiter; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 处理粘包/拆包问题-定义分隔符 12 * 13 */ 14 public class TimeClientHandler extends ChannelHandlerAdapter { 15 16 private int counter; 17 private byte[] req; 18 19 @Override 20 //向服务器发送指令 21 public void channelActive(ChannelHandlerContext ctx) throws Exception { 22 ByteBuf message=null; 23 //模拟一百次请求,发送重复内容 24 for (int i = 0; i < 200; i++) { 25 //处理粘包/拆包问题-定义分隔符 26 req = ("QUERY TIME ORDER"+"$_").getBytes(); 27 message=Unpooled.buffer(req.length); 28 message.writeBytes(req); 29 ctx.writeAndFlush(message); 30 } 31 32 } 33 34 @Override 35 //接收服务器的响应 36 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 37 String body = (String) msg; 38 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 39 } 40 41 @Override 42 //异常处理 43 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 44 //释放资源 45 ctx.close(); 46 } 47 48 }

3、将消息分为消息头和消息体,消息头中包含消息总长度(或消息体总长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总程度;

4、更复杂的应用层协议;

 六、Netty高性能的原因

1、异步非阻塞通信

在IO编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者IO多路复用技术进行处理。IO多路复用技术通过把多个IO的阻塞复用到同一个Selector的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型相比,IO多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。

Netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端SocketChannel。由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由频繁的IO阻塞导致的线程挂起。另外,由于Netty采用了异步通信模式,一个IO线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞IO中 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

2、高效的Reactor线程模型

常用的Reactor线程模型有三种,分别如下:

Reactor单线程模型:

 

Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程职责如下:

1、作为NIO服务端,接收客户端的TCP连接;

2、作为NIO客户端,向服务端发起TCP连接;

3、读取通信对端的请求或者应答消息;

4、向通信对端发送请求消息或者应答消息;

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息编码。用户Handler可以通过NIO线程将消息发送给客户端。

对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,主要原因如下:

1、一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;

2、当NIO线程负载过重后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈;

3、可靠性问题。一旦NIO线程意外进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

为了解决这些问题,从而演进出了Reactor多线程模型。

Reactor多线程模型:

 

Reactor多线程模型与单线程模型最大的区别就是有一组NIO线程处理IO操作,特点如下:

1、有一个专门的NIO线程——Acceptor线程用于监听服务端,接收客户端TCP连接请求;

2、网络IO操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、编码、解码和发送;

3、1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如百万客户端并发连接,或者服务端需要对客户端的握手消息进行安全认证,认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型——主从Reactor多线程模型。

主从Reactor多线程模型:

 

主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(subReactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池只用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。

利用主从NIO线程模型,可以解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题。Netty官方推荐使用该线程模型。它的工作流程总结如下:

1、从主线程池中随机选择一个Reactor线程作为Acceptor线程,用于绑定监听端口,接收客户端连接;

2、Acceptor线程接收客户端连接请求之后,创建新的SocketChannel,将其注册到主线程池的其他Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作;

3、然后也业务层的链路正式建立成功,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,用于处理IO的读写操作。

3、无锁化的串行设计

在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能地避免锁竞争带来的性能损耗,可以通过串行化设计,既消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。

为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列——多个工作线程模型性能更优。

Netty串行化设计工作原理图如下:

Netty的NioEventLoop读取到消息后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换。这种串行化处理方式避免了多线程导致的锁竞争,从性能角度看是最优的。

4、高效的并发编程

Netty中高效并发编程主要体现:

1、volatile的大量、正确使用;

2、CAS和原子类的广泛使用;

3、线程安全容器的使用;

4、通过读写锁提升并发性能。

5、高性能的序列化框架

    影响序列化性能的关键因素总结如下:

    1、序列化后的码流大小(网络宽带的占用);

     2、序列化与反序列化的性能(CPU资源占用);

    3、是否支持跨语言(异构系统的对接和开发语言切换)。

    Netty默认提供了对GoogleProtobuf的支持,通过扩展Netty的编解码接口,用户可以实现其他的高性能序列化框架

6、零拷贝

    Netty的“零拷贝”主要体现在三个方面:

    1)、Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

    2)、第二种“零拷贝 ”的实现CompositeByteBuf,它对外将多个ByteBuf封装成一个ByteBuf,对外提供统一封装后的ByteBuf接口。

    3)、第三种“零拷贝”就是文件传输,Netty文件传输类DefaultFileRegion通过transferTo方法将文件发送到目标Channel中。很多操作系统直接将文件缓冲区的内容发送到目标Channel中,而不需要通过循环拷贝的方式,这是一种更加高效的传输方式,提升了传输性能,降低了CPU和内存占用,实现了文件传输的“零拷贝”。

7、内存池

    随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于内存池的缓冲区重用机制。

1 package com.studyio.netty; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.PooledByteBufAllocator; 5 import io.netty.buffer.Unpooled; 6 7 /** 8 * 9 * @author lgs 10 * 通过内存池的方式构建直接缓冲区 11 */ 12 public class PooledByteBufDemo { 13 14 public static void main(String[] args) { 15 byte[] content = new byte[1024]; 16 int loop = 3000000; 17 long startTime = System.currentTimeMillis(); 18 19 ByteBuf poolBuffer = null; 20 for (int i = 0; i < loop; i++) { 21 //通过内存池的方式构建直接缓冲区 22 poolBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024); 23 poolBuffer.writeBytes(content); 24 //释放buffer 25 poolBuffer.release(); 26 } 27 long startTime2 = System.currentTimeMillis(); 28 ByteBuf buffer = null; 29 for (int i = 0; i < loop; i++) { 30 //通过非内存池的方式构建直接缓冲区 31 buffer = Unpooled.directBuffer(1024); 32 buffer.writeBytes(content); 33 buffer.release(); 34 } 35 long endTime = System.currentTimeMillis(); 36 System.out.println("The PooledByteBuf use time :"+(startTime2-startTime)); 37 System.out.println("The UnpooledByteBuf use time :"+(endTime-startTime2)); 38 } 39 }

 运行结果:内存池的方式构建直接缓冲区效率更高

The PooledByteBuf use time :740 The UnpooledByteBuf use time :1025

8、灵活的TCP参数配置能力

    Netty在启动辅助类中可以灵活的配置TCP参数,满足不同的用户场景。合理设置TCP参数在某些场景下对于性能的提升可以起到的显著的效果,总结一下对性能影响比较大的几个配置项:

    1)、SO_RCVBUF和SO_SNDBUF:通常建议值为128KB或者256KB;

    2)、TCP_NODELAY:NAGLE算法通过将缓冲区内的小封包自动相连,组成较大的封包,阻止大量小封包的发送阻塞网络,从而提高网络应用效率。但是对于时延敏感的应用场景需要关闭该优化算法;

    3)、软中断:如果Linux内核版本支持RPS(2.6.35以上版本),开启RPS后可以实现软中断,提升网络吞吐量。RPS根据数据包的源地址,目的地址以及目的和源端口,计算出一个hash值,然后根据这个hash值来选择软中断运行的CPU。从上层来看,也就是说将每个连接和CPU绑定,并通过这个hash值,来均衡软中断在多个CPU上,提升网络并行处理性能。

转载请注明原文地址: https://www.6miu.com/read-3450144.html

最新回复(0)