netty——LengthFieldBasedFrameDecoder实例(解决半包)

xiaoxiao2021-02-28  26

1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输

可以解决半包粘包

2 代码部分

tcpserver

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;      import io.netty.bootstrap.ServerBootstrap;   import io.netty.channel.ChannelFuture;   import io.netty.channel.ChannelInitializer;   import io.netty.channel.ChannelPipeline;   import io.netty.channel.EventLoopGroup;   import io.netty.channel.nio.NioEventLoopGroup;   import io.netty.channel.socket.SocketChannel;   import io.netty.channel.socket.nio.NioServerSocketChannel;   import io.netty.handler.codec.LengthFieldBasedFrameDecoder;   import io.netty.handler.codec.LengthFieldPrepender;   import io.netty.handler.codec.string.StringDecoder;   import io.netty.handler.codec.string.StringEncoder;   import io.netty.util.CharsetUtil;      import org.apache.log4j.Logger;        public class TcpServer {              private static final Logger logger = Logger.getLogger(TcpServer.class);         private static final String IP = "127.0.0.1";         private static final int PORT = 9999;         /**用于分配处理业务线程的线程组个数 */         protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2//默认         /** 业务出现线程大小*/         protected static final int BIZTHREADSIZE = 4;             /*        * NioEventLoopGroup实际上就是个线程池,        * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,        * 每一个NioEventLoop负责处理m个Channel,        * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel        */         private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);         private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);                  protected static void run() throws Exception {             ServerBootstrap b = new ServerBootstrap();             b.group(bossGroup, workerGroup);             b.channel(NioServerSocketChannel.class);             b.childHandler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline pipeline = ch.pipeline();                     pipeline.addLast("frameDecoder"new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0404));                     pipeline.addLast("frameEncoder"new LengthFieldPrepender(4));                     pipeline.addLast("decoder"new StringDecoder(CharsetUtil.UTF_8));                     pipeline.addLast("encoder"new StringEncoder(CharsetUtil.UTF_8));                     pipeline.addLast(new TcpServerHandler());                 }             });                  // b.bind(IP, PORT).sync();             ChannelFuture f = b.bind(PORT).sync(); // (7)                 f.channel().closeFuture().sync();                                 logger.info("TCP服务器已启动");         }                  protected static void shutdown() {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }              public static void main(String[] args) throws Exception {             logger.info("开始启动TCP服务器...");             TcpServer.run();     //      TcpServer.shutdown();         }     }    

TcpServerHandler

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;            import io.netty.channel.ChannelHandlerAdapter;   import io.netty.channel.ChannelHandlerContext;        public class TcpServerHandler extends ChannelHandlerAdapter  {              @Override       public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)          ctx.write(msg); // (1)          String obj= (String)msg;          System.out.println("访问数据"+obj);         // ctx.write(obj); // (1)          ctx.flush(); // (2)          }          @Override       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)             // Close theconnection when an exception is raised.          cause.printStackTrace();          ctx.close();       }       @Override       public void channelActive(final ChannelHandlerContext ctx) {               ctx.writeAndFlush("有客户端连接"); // (3)       }        }     TcpClientHander

[java]  view plain  copy  print ? <span style="font-size:18px;">package com.bimatrix.revit.nettyTest;      import io.netty.channel.ChannelHandlerAdapter;   import io.netty.channel.ChannelHandlerContext;      public class TcpClientHandler extends ChannelHandlerAdapter {         public TcpClientHandler() {         }         //private byte[] req;       /**        * 链路链接成功        */         @Override         public void channelActive(ChannelHandlerContext ctx) throws Exception {             /*for (int i = 0; i < 1000; i++) {              ctx.writeAndFlush("1ac");          } */           // 链接成功后发送         }              @Override         public void channelRead(ChannelHandlerContext ctx, Object msg)                 throws Exception {              System.out.println("client收到数据" +msg);   //       ctx.write("收到数据!");    //       ctx.write(msg);    //       ctx.write("w2d");             }              @Override         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {             ctx.flush();         }              @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)                 throws Exception {             cause.printStackTrace();             ctx.close();         }     }  </span>  

TcpClient

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;      import io.netty.bootstrap.Bootstrap;     import io.netty.channel.Channel;     import io.netty.channel.ChannelInitializer;     import io.netty.channel.ChannelOption;     import io.netty.channel.ChannelPipeline;     import io.netty.channel.EventLoopGroup;     import io.netty.channel.nio.NioEventLoopGroup;     import io.netty.channel.socket.nio.NioSocketChannel;     import io.netty.handler.codec.LengthFieldBasedFrameDecoder;     import io.netty.handler.codec.LengthFieldPrepender;     import io.netty.handler.codec.string.StringDecoder;     import io.netty.handler.codec.string.StringEncoder;     import io.netty.util.CharsetUtil;          import org.apache.log4j.Logger;          public class TcpClient {         private static final Logger logger = Logger.getLogger(TcpClient.class);         public static String HOST = "127.0.0.1";         public static int PORT = 9999;                  public static Bootstrap bootstrap = getBootstrap();         public static Channel channel = getChannel(HOST,PORT);         /**        * 初始化Bootstrap        * @return        */         public static final Bootstrap getBootstrap(){             EventLoopGroup group = new NioEventLoopGroup();             Bootstrap b = new Bootstrap();             b.group(group).channel(NioSocketChannel.class);             b.handler(new ChannelInitializer<Channel>() {                 @Override                 protected void initChannel(Channel ch) throws Exception {                     ChannelPipeline pipeline = ch.pipeline();                     pipeline.addLast("frameDecoder"new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0404));                     pipeline.addLast("frameEncoder"new LengthFieldPrepender(4));                     pipeline.addLast("decoder"new StringDecoder(CharsetUtil.UTF_8));                     pipeline.addLast("encoder"new StringEncoder(CharsetUtil.UTF_8));                     pipeline.addLast("handler"new TcpClientHandler());                 }             });             b.option(ChannelOption.SO_KEEPALIVE, true);             return b;         }              public static final Channel getChannel(String host,int port){             Channel channel = null;             try {                 channel = bootstrap.connect(host, port).sync().channel();             } catch (Exception e) {                 logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e);                 return null;             }             return channel;         }              public static void sendMsg(String msg) throws Exception {             if(channel!=null){                 channel.writeAndFlush(msg).sync();             }else{                 logger.warn("消息发送失败,连接尚未建立!");             }         }              public static void main(String[] args) throws Exception {               try {                 long t0 = System.nanoTime();                 for (int i = 0; i < 100000; i++) {                    if(i%3==0){                        TcpClient.sendMsg(i+"11aaa222aaa");                    }else if(i%3==1){                        TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");                    }else if(i%3==2){                    TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");                    }               }                 long t1 = System.nanoTime();                 System.out.println((t1-t0)/1000000.0);             } catch (Exception e) {                 e.printStackTrace();             }         }     }    

上面代码部分下面这一块 主要测试消息发送是否会造成粘包  for (int i = 0; i < 100000; i++) {               if(i%3==0){              TcpClient.sendMsg(i+"11aaa222aaa");               }else if(i%3==1){              TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");               }else if(i%3==2){             TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");               }             }  

1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输

可以解决半包粘包

2 代码部分

tcpserver

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;      import io.netty.bootstrap.ServerBootstrap;   import io.netty.channel.ChannelFuture;   import io.netty.channel.ChannelInitializer;   import io.netty.channel.ChannelPipeline;   import io.netty.channel.EventLoopGroup;   import io.netty.channel.nio.NioEventLoopGroup;   import io.netty.channel.socket.SocketChannel;   import io.netty.channel.socket.nio.NioServerSocketChannel;   import io.netty.handler.codec.LengthFieldBasedFrameDecoder;   import io.netty.handler.codec.LengthFieldPrepender;   import io.netty.handler.codec.string.StringDecoder;   import io.netty.handler.codec.string.StringEncoder;   import io.netty.util.CharsetUtil;      import org.apache.log4j.Logger;        public class TcpServer {              private static final Logger logger = Logger.getLogger(TcpServer.class);         private static final String IP = "127.0.0.1";         private static final int PORT = 9999;         /**用于分配处理业务线程的线程组个数 */         protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2//默认         /** 业务出现线程大小*/         protected static final int BIZTHREADSIZE = 4;             /*        * NioEventLoopGroup实际上就是个线程池,        * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,        * 每一个NioEventLoop负责处理m个Channel,        * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel        */         private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);         private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);                  protected static void run() throws Exception {             ServerBootstrap b = new ServerBootstrap();             b.group(bossGroup, workerGroup);             b.channel(NioServerSocketChannel.class);             b.childHandler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline pipeline = ch.pipeline();                     pipeline.addLast("frameDecoder"new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0404));                     pipeline.addLast("frameEncoder"new LengthFieldPrepender(4));                     pipeline.addLast("decoder"new StringDecoder(CharsetUtil.UTF_8));                     pipeline.addLast("encoder"new StringEncoder(CharsetUtil.UTF_8));                     pipeline.addLast(new TcpServerHandler());                 }             });                  // b.bind(IP, PORT).sync();             ChannelFuture f = b.bind(PORT).sync(); // (7)                 f.channel().closeFuture().sync();                                 logger.info("TCP服务器已启动");         }                  protected static void shutdown() {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }              public static void main(String[] args) throws Exception {             logger.info("开始启动TCP服务器...");             TcpServer.run();     //      TcpServer.shutdown();         }     }    

TcpServerHandler

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;            import io.netty.channel.ChannelHandlerAdapter;   import io.netty.channel.ChannelHandlerContext;        public class TcpServerHandler extends ChannelHandlerAdapter  {              @Override       public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)          ctx.write(msg); // (1)          String obj= (String)msg;          System.out.println("访问数据"+obj);         // ctx.write(obj); // (1)          ctx.flush(); // (2)          }          @Override       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)             // Close theconnection when an exception is raised.          cause.printStackTrace();          ctx.close();       }       @Override       public void channelActive(final ChannelHandlerContext ctx) {               ctx.writeAndFlush("有客户端连接"); // (3)       }        }     TcpClientHander

[java]  view plain  copy  print ? <span style="font-size:18px;">package com.bimatrix.revit.nettyTest;      import io.netty.channel.ChannelHandlerAdapter;   import io.netty.channel.ChannelHandlerContext;      public class TcpClientHandler extends ChannelHandlerAdapter {         public TcpClientHandler() {         }         //private byte[] req;       /**        * 链路链接成功        */         @Override         public void channelActive(ChannelHandlerContext ctx) throws Exception {             /*for (int i = 0; i < 1000; i++) {              ctx.writeAndFlush("1ac");          } */           // 链接成功后发送         }              @Override         public void channelRead(ChannelHandlerContext ctx, Object msg)                 throws Exception {              System.out.println("client收到数据" +msg);   //       ctx.write("收到数据!");    //       ctx.write(msg);    //       ctx.write("w2d");             }              @Override         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {             ctx.flush();         }              @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)                 throws Exception {             cause.printStackTrace();             ctx.close();         }     }  </span>  

TcpClient

[java]  view plain  copy  print ? package com.bimatrix.revit.nettyTest;      import io.netty.bootstrap.Bootstrap;     import io.netty.channel.Channel;     import io.netty.channel.ChannelInitializer;     import io.netty.channel.ChannelOption;     import io.netty.channel.ChannelPipeline;     import io.netty.channel.EventLoopGroup;     import io.netty.channel.nio.NioEventLoopGroup;     import io.netty.channel.socket.nio.NioSocketChannel;     import io.netty.handler.codec.LengthFieldBasedFrameDecoder;     import io.netty.handler.codec.LengthFieldPrepender;     import io.netty.handler.codec.string.StringDecoder;     import io.netty.handler.codec.string.StringEncoder;     import io.netty.util.CharsetUtil;          import org.apache.log4j.Logger;          public class TcpClient {         private static final Logger logger = Logger.getLogger(TcpClient.class);         public static String HOST = "127.0.0.1";         public static int PORT = 9999;                  public static Bootstrap bootstrap = getBootstrap();         public static Channel channel = getChannel(HOST,PORT);         /**        * 初始化Bootstrap        * @return        */         public static final Bootstrap getBootstrap(){             EventLoopGroup group = new NioEventLoopGroup();             Bootstrap b = new Bootstrap();             b.group(group).channel(NioSocketChannel.class);             b.handler(new ChannelInitializer<Channel>() {                 @Override                 protected void initChannel(Channel ch) throws Exception {                     ChannelPipeline pipeline = ch.pipeline();                     pipeline.addLast("frameDecoder"new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0404));                     pipeline.addLast("frameEncoder"new LengthFieldPrepender(4));                     pipeline.addLast("decoder"new StringDecoder(CharsetUtil.UTF_8));                     pipeline.addLast("encoder"new StringEncoder(CharsetUtil.UTF_8));                     pipeline.addLast("handler"new TcpClientHandler());                 }             });             b.option(ChannelOption.SO_KEEPALIVE, true);             return b;         }              public static final Channel getChannel(String host,int port){             Channel channel = null;             try {                 channel = bootstrap.connect(host, port).sync().channel();             } catch (Exception e) {                 logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e);                 return null;             }             return channel;         }              public static void sendMsg(String msg) throws Exception {             if(channel!=null){                 channel.writeAndFlush(msg).sync();             }else{                 logger.warn("消息发送失败,连接尚未建立!");             }         }              public static void main(String[] args) throws Exception {               try {                 long t0 = System.nanoTime();                 for (int i = 0; i < 100000; i++) {                    if(i%3==0){                        TcpClient.sendMsg(i+"11aaa222aaa");                    }else if(i%3==1){                        TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");                    }else if(i%3==2){                    TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");                    }               }                 long t1 = System.nanoTime();                 System.out.println((t1-t0)/1000000.0);             } catch (Exception e) {                 e.printStackTrace();             }         }     }    

上面代码部分下面这一块 主要测试消息发送是否会造成粘包  for (int i = 0; i < 100000; i++) {               if(i%3==0){              TcpClient.sendMsg(i+"11aaa222aaa");               }else if(i%3==1){              TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb");               }else if(i%3==2){             TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc");               }             }  
转载请注明原文地址: https://www.6miu.com/read-800053.html

最新回复(0)