TCP出现粘包拆包原因有三个:
1.应用程序write写入的字节大小大于套接口发送缓冲区大小;
2.进行MSS(最大报文长度)大小的TCP分段(TCP报文长度-TCP头部长度>MSS的时候将发生拆包);
3.以太网帧的payload大于MTU(最大数据包大小)进行IP分片。
粘包的解决方案:
1.消息定长,例如每个报文的大小为固定长度200字节,如果不够,空格补空格;
2.在包尾增加回车换行符进行分割,例如FTP协议;
3.将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
4.更复杂的应用层协议。
首先使用未加Netty的半包解码器来看粘包拆包问题:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel arg0) throws Exception { System.out.println("客户端启动.."); arg0.pipeline().addLast(new TimeClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new TimeClient().connect(port, "127.0.0.1"); } } import java.util.logging.Logger; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(com.netty2.TimeClientHandler2.class.getName()); private int counter; private byte[] req; public TimeClientHandler() { req = "QUERY TIME ORDER".getBytes(); // firstMessage = Unpooled.buffer(req.length); // firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //与服务端建立连接后 System.out.println("客户端 channelActive.."); ByteBuf message=null; for (int i=0;i<100;i++){ message=Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端 channelRead.."); //服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("当前时间 :" + body +" 次数:"+ (++counter)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("客户端异常.."); // 释放资源 logger.warning("Unexpected exception from downstream:" + cause.getMessage()); ctx.close(); } } import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f=b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { //退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ protected void initChannel(SocketChannel ch) throws Exception { System.out.println("服务端启动……"); ch.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port=8080; new TimeServer().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeServerHandler extends ChannelInboundHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器启动.."); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服务器接收数据:" + body+" 次数:"+ (++counter)); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("服务器读取完毕.."); ctx.flush();//刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("服务器异常.."); ctx.close(); } }可以看到它服务端和客户端只接收到了两次。
LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中可读字节,判断看是否有“\n”或者“\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后任然没有发现换行符就会抛出异常,同时忽略之前读取到的异常码流。
StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器,他被设计用来支持TCP的粘包和拆包。