Netty处理TCP粘包、拆包问题之DelimiterBasedFrameDecoder

xiaoxiao2021-02-28  41

熟悉TCP编程的可能都知道,无论是服务器还是客户端,当我们读取或者发送数据的时候,都需要考虑TCP低层的粘包/拆包机制。

TCP一个“流”协议,所谓流就是没有界限的传送数据。大家可以想象一下,如果河里的水就好比数据,他们是连成一片的,没有分界线,TCP低层并不了解上层的业务数据具体的含义,它会根据TCP缓冲区的实际情况进行包的划分,就是说,在业务上,我么一个完成的包可能会被TCP分成多个包进行发送,也可能把多个小包封装成一个大的数据包发送出去,这就是所谓的TCP尿包、拆包问题。

稍微分析一下TCP粘包、拆包问题的产生原因:

    1、应用程序write写入的字节大小大于套接口发送缓冲区的大小

    2、进行MSS大小的TCP分段

    3、以太网帧的payload大于MTU进行IP分片。

粘包拆包问题的解决方法,根据业界主流协议的有三种方案:

    1、消息定长,例如每个报文的大小固定为200个字节,如果不够,空位补空格。

    2、在包尾部增加特殊字符进行分割,例如加回车等

    3、将消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,然后进行业务逻辑的处理。

 

Netty对以上3种应用做了抽象,提供了4种解码器,有了解码器,码农们不用考虑TCP的粘包、拆包的问题了。

LineBasedFrameDecoder:依次编译bytebuf中的可读字符,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持单行的最大长度。如果连续读取到最大长度后,仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

FixedLengthFrameDecoder:是固定长度解码器,它能按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包等问题。利用FixedLengthFrameDecoder解码,无论一次性接收到多少的数据,他都会按照构造函数中设置的长度进行解码;如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包,到达后进行拼包,直到读取完整的包。

DelimiterBasedFrameDecoder:是自定义的分隔符解码,构造函数的第一个参数表示单个消息的最大长度,当达到该长度后仍然没有查到分隔符,就抛出TooLongFrameException异常,防止由于异常码流缺失分隔符导致的内存溢出。

 

下面的例子是自定义的分隔符解码器。

服务端Server:

public class Server { public static void main(String[] args) throws Exception { //1、第一个线程组 是用于处理服务器接收客户端连接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2、第二个线程组 是用于进行网络通信的(网络读写的) EventLoopGroup workGroup = new NioEventLoopGroup(); //3、创建一个辅助工具类Bootstrap,用于服务通道的一系列配置 ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workGroup) //绑定两个线程组 .channel(NioServerSocketChannel.class) //指定NIO的模式 .option(ChannelOption.SO_BACKLOG, 1014) //设置tcp缓冲区大小 .option(ChannelOption.SO_SNDBUF, 32*2014) //设置发送缓冲区大小 .option(ChannelOption.SO_RCVBUF,32*2014) //设置接受缓冲区大小 .option(ChannelOption.SO_KEEPALIVE, true) //保持连接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //为了解决拆包粘包的问题,我们可以设置特殊的分隔符 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //第一个参数表示单个消息的最大长度 第二个参数就是特殊分隔符 (当消息到了最大长度还没查到分隔符,那么就会报错TooLongFrameException异常) sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的编码 sc.pipeline().addLast(new StringEncoder()); //设置字符串形式的解码,以后在Handler那里获取的msg就是String类型的了。 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); //配置具体数据接收方法的处理器 } }); //4、绑定服务端指定的端口,进行监听 ChannelFuture f = sb.bind(8765); //Thread.sleep(1000000); //5、等待关闭 f.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }

服务器事件处理器ServerHandler:

public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //因为配置了字符串形式的解码,所以msg会是String类型 String data = (String)msg; System.out.println("Server接收到的数据:"+data); //服务器给客户端返回数据 注意,返回的字符串最后一定得加上设置的分隔符$_ String str = "客户端你好$_"; ctx.writeAndFlush(str); //直接调用writeAndFuls方法就直接写完就flush } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

客户端Client:

public class Client { public static void main(String[] args) throws Exception { //1、创建线程组,客户端和服务端不一样,客户端只需要一个即可 EventLoopGroup workGroup = new NioEventLoopGroup(); //创建辅助类,和服务端的不一样,服务端的是ServerBootstrap,而客户端只是BootStrap Bootstrap bs = new Bootstrap(); //加入线程组 bs.group(workGroup) //指定通道类型 .channel(NioSocketChannel.class) //绑定事件处理器 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置特殊分隔符 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的编码 sc.pipeline().addLast(new StringEncoder()); //设置字符串解码 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); //链接服务端 ChannelFuture cf = bs.connect("127.0.0.1", 8765).sync(); //给服务端写数据,现在不再需要是写缓冲区了,因为我们上面的配置加了字符串的编码,直接写入字符串即可。 cf.channel().writeAndFlush("服务端你好$_"); //记得加上$_,因为这个分隔符是拿来判断消息是否该发送的。 //异步监听管道的关闭,如果关闭了就往下继续执行 cf.channel().closeFuture().sync(); workGroup.shutdownGracefully(); } }

客户端事件处理器ClientHandler:

public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ //接收服务端返回的数据 String data = (String)msg; System.out.println("Client接收到的数据:"+data); }finally{ //因为没有进行写操作,所以需要自己来释放 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

执行结果:

服务端的:确实完美地接收到了 Server接收到的数据:服务 Server接收到的数据:服务端 Server接收到的数据:服务端你 Server接收到的数据:服务端你好 客户端:也收到了服务端四次的回复 Client接收到的数据:客户端你好 Client接收到的数据:客户端你好 Client接收到的数据:客户端你好 Client接收到的数据:客户端你好

 

转载请注明原文地址: https://www.6miu.com/read-2624094.html

最新回复(0)