1、什么是粘包/拆包
一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。
2、解决办法
2.1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
2.2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
2.3、将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段
3、自定义协议,来实现TCP的粘包/拆包问题
3.0 自定义协议,开始标记
3.1 自定义协议的介绍
3.2 自定义协议的类的封装
3.3 自定义协议的编码器
3.4 自定义协议的解码器
4、协议相关的实现
4.1 协议的封装
[java] view plain copy print ? import java.util.Arrays; /** * <pre> * 自己定义的协议 * 数据包格式 * +——----——+——-----——+——----——+ * |协议开始标志| 长度 | 数据 | * +——----——+——-----——+——----——+ * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 * 2.传输数据的长度contentLength,int类型 * 3.要传输的数据 * </pre> */ public class SmartCarProtocol { /** * 消息的开头的信息标志 */ private int head_data = ConstantValue.HEAD_DATA; /** * 消息的长度 */ private int contentLength; /** * 消息的内容 */ private byte[] content; /** * 用于初始化,SmartCarProtocol * * @param contentLength * 协议里面,消息数据的长度 * @param content * 协议里面,消息的数据 */ public SmartCarProtocol(int contentLength, byte[] content) { this.contentLength = contentLength; this.content = content; } public int getHead_data() { return head_data; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this.contentLength = contentLength; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } @Override public String toString() { return "SmartCarProtocol [head_data=" + head_data + ", contentLength=" + contentLength + ", content=" + Arrays.toString(content) + "]"; } }
4.2 协议的编码器
[java] view plain copy print ? /** * <pre> * 自己定义的协议 * 数据包格式 * +——----——+——-----——+——----——+ * |协议开始标志| 长度 | 数据 | * +——----——+——-----——+——----——+ * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 * 2.传输数据的长度contentLength,int类型 * 3.要传输的数据 * </pre> */ public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> { @Override protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg, ByteBuf out) throws Exception { // 写入消息SmartCar的具体内容 // 1.写入消息的开头的信息标志(int类型) out.writeInt(msg.getHead_data()); // 2.写入消息的长度(int 类型) out.writeInt(msg.getContentLength()); // 3.写入消息的内容(byte[]类型) out.writeBytes(msg.getContent()); } }
4.3 协议的解码器
[java] view plain copy print ? import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * <pre> * 自己定义的协议 * 数据包格式 * +——----——+——-----——+——----——+ * |协议开始标志| 长度 | 数据 | * +——----——+——-----——+——----——+ * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 * 2.传输数据的长度contentLength,int类型 * 3.要传输的数据,长度不应该超过2048,防止socket流的攻击 * </pre> */ public class SmartCarDecoder extends ByteToMessageDecoder { /** * <pre> * 协议开始的标准head_data,int类型,占据4个字节. * 表示数据的长度contentLength,int类型,占据4个字节. * </pre> */ public final int BASE_LENGTH = 4 + 4; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { // 可读长度必须大于基本长度 if (buffer.readableBytes() >= BASE_LENGTH) { // 防止socket字节流攻击 // 防止,客户端传来的数据过大 // 因为,太大的数据,是不合理的 if (buffer.readableBytes() > 2048) { buffer.skipBytes(buffer.readableBytes()); } // 记录包头开始的index int beginReader; while (true) { // 获取包头开始的index beginReader = buffer.readerIndex(); // 标记包头开始的index buffer.markReaderIndex(); // 读到了协议的开始标志,结束while循环 if (buffer.readInt() == ConstantValue.HEAD_DATA) { break; } // 未读到包头,略过一个字节 // 每次略过,一个字节,去读取,包头信息的开始标记 buffer.resetReaderIndex(); buffer.readByte(); // 当略过,一个字节之后, // 数据包的长度,又变得不满足 // 此时,应该结束。等待后面的数据到达 if (buffer.readableBytes() < BASE_LENGTH) { return; } } // 消息的长度 int length = buffer.readInt(); // 判断请求数据包数据是否到齐 if (buffer.readableBytes() < length) { // 还原读指针 buffer.readerIndex(beginReader); return; } // 读取data数据 byte[] data = new byte[length]; buffer.readBytes(data); SmartCarProtocol protocol = new SmartCarProtocol(data.length, data); out.add(protocol); } } }
4.4 服务端加入协议的编/解码器
4.5 客户端加入协议的编/解码器
5、服务端的实现
[java] view plain copy print ? import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { public Server() { } public void bind(int port) throws Exception { // 配置NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 服务器辅助启动类配置 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChildChannelHandler())// .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // 绑定端口 同步等待绑定成功 ChannelFuture f = b.bind(port).sync(); // (7) // 等到服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅释放线程资源 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } /** * 网络事件处理器 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加自定义协议的编解码工具 ch.pipeline().addLast(new SmartCarEncoder()); ch.pipeline().addLast(new SmartCarDecoder()); // 处理网络IO ch.pipeline().addLast(new ServerHandler()); } } public static void main(String[] args) throws Exception { new Server().bind(9999); } } 6、服务端Handler的实现
[java] view plain copy print ? import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { // 用于获取客户端发送的信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 用于获取客户端发来的数据信息 SmartCarProtocol body = (SmartCarProtocol) msg; System.out.println("Server接受的客户端的信息 :" + body.toString()); // 会写数据给客户端 String str = "Hi I am Server ..."; SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length, str.getBytes()); // 当服务端完成写操作后,关闭与客户端的连接 ctx.writeAndFlush(response); // .addListener(ChannelFutureListener.CLOSE); // 当有写操作时,不需要手动释放msg的引用 // 当只有读操作时,才需要手动释放msg的引用 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // cause.printStackTrace(); ctx.close(); } } 7、客户端的实现
[java] view plain copy print ? import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { /** * 连接服务器 * * @param port * @param host * @throws Exception */ public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { // 客户端辅助启动类 对客户端配置 Bootstrap b = new Bootstrap(); b.group(group)// .channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true)// .handler(new MyChannelHandler());// // 异步链接服务器 同步等待链接成功 ChannelFuture f = b.connect(host, port).sync(); // 等待链接关闭 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); System.out.println("客户端优雅的释放了线程资源..."); } } /** * 网络事件处理器 */ private class MyChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加自定义协议的编解码工具 ch.pipeline().addLast(new SmartCarEncoder()); ch.pipeline().addLast(new SmartCarDecoder()); // 处理网络IO ch.pipeline().addLast(new ClientHandler()); } } public static void main(String[] args) throws Exception { new Client().connect(9999, "127.0.0.1"); } } 8、客户端Handler的实现
[java] view plain copy print ? import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; //用于读取客户端发来的信息 public class ClientHandler extends ChannelHandlerAdapter { // 客户端与服务端,连接成功的售后 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 发送SmartCar协议的消息 // 要发送的信息 String data = "I am client ..."; // 获得要发送信息的字节数组 byte[] content = data.getBytes(); // 要发送信息的长度 int contentLength = content.length; SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content); ctx.writeAndFlush(protocol); } // 只是读数据,没有写数据的话 // 需要自己手动的释放的消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { // 用于获取客户端发来的数据信息 SmartCarProtocol body = (SmartCarProtocol) msg; System.out.println("Client接受的客户端的信息 :" + body.toString()); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }