Netty粘包拆包(一)

xiaoxiao2022-06-11  35

TCP出现粘包拆包原因有三个:

1.应用程序write写入的字节大小大于套接口发送缓冲区大小;

2.进行MSS(最大报文长度)大小的TCP分段(TCP报文长度-TCP头部长度>MSS的时候将发生拆包);

3.以太网帧的payload大于MTU(最大数据包大小)进行IP分片。

粘包的解决方案:

1.消息定长,例如每个报文的大小为固定长度200字节,如果不够,空格补空格;

2.在包尾增加回车换行符进行分割,例如FTP协议;

3.将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;

4.更复杂的应用层协议。

首先使用未加Netty的半包解码器来看粘包拆包问题:

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { System.out.println("客户端启动.."); arg0.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new TimeClient().connect(port, "127.0.0.1"); } } import java.util.logging.Logger; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(com.netty2.TimeClientHandler2.class.getName()); private int counter; private byte[] req; public TimeClientHandler() { req = "QUERY TIME ORDER".getBytes(); // firstMessage = Unpooled.buffer(req.length); // firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //与服务端建立连接后 System.out.println("客户端 channelActive.."); ByteBuf message=null; for (int i=0;i<100;i++){ message=Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端 channelRead.."); //服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("当前时间 :" + body +" 次数:"+ (++counter)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端异常.."); // 释放资源 logger.warning("Unexpected exception from downstream:" + cause.getMessage()); ctx.close(); } } import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f=b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { //退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ protected void initChannel(SocketChannel ch) throws Exception { System.out.println("服务端启动……"); ch.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port=8080; new TimeServer().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器启动.."); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服务器接收数据:" + body+" 次数:"+ (++counter)); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("服务器读取完毕.."); ctx.flush();//刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常.."); ctx.close(); } }

可以看到它服务端和客户端只接收到了两次。

利用LineBasedFrameDecoder解决TCP粘包问题

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeClient2 { public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler2()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端连接关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8082; new TimeClient2().connect(port, "127.0.0.1"); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.logging.Logger; public class TimeClientHandler2 extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(TimeClientHandler2.class.getName()); private int counter; private byte[] req; public TimeClientHandler2() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx){ //与服务端建立连接后 System.out.println("客户端 channelActive.."); ByteBuf message=null; for (int i=0;i<100;i++){ message=Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("客户端 channelRead.."); //服务端返回消息后 String body = (String) msg; System.out.println("当前时间 :" + body + " 次数:" + (++counter)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端异常.."); // 释放资源 logger.warning("Unexpected exception from downstream:" + cause.getMessage()); ctx.close(); } } import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeServer2 { public void bind(int port) throws Exception { // NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeServerHandler2()); } } public static void main(String[] args) throws Exception { int port=8082; new TimeServer2().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeServerHandler2 extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ System.out.println("服务器启动.."); String body = (String) msg; System.out.println("服务器接收数据:" + body+" 次数:"+ (++counter)); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; if(counter%2==0) currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("服务器读取完毕.."); ctx.flush();//刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println("服务器异常.."); ctx.close(); } }

 

    LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后任然没有发现换行符就会抛出异常,同时忽略之前读取到的异常码流。

    StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器,他被设计用来支持TCP的粘包和拆包。

转载请注明原文地址: https://www.6miu.com/read-4931875.html

最新回复(0)