1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输
可以解决半包粘包
2 代码部分
tcpserver
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.apache.log4j.Logger; public class TcpServer { private static final Logger logger = Logger.getLogger(TcpServer.class); private static final String IP = "127.0.0.1"; private static final int PORT = 9999; /**用于分配处理业务线程的线程组个数 */ protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认 /** 业务出现线程大小*/ protected static final int BIZTHREADSIZE = 4; /* * NioEventLoopGroup实际上就是个线程池, * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件, * 每一个NioEventLoop负责处理m个Channel, * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel */ private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); protected static void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); // b.bind(IP, PORT).sync(); ChannelFuture f = b.bind(PORT).sync(); // (7) f.channel().closeFuture().sync(); logger.info("TCP服务器已启动"); } protected static void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { logger.info("开始启动TCP服务器..."); TcpServer.run(); // TcpServer.shutdown(); } }
TcpServerHandler
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TcpServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) ctx.write(msg); // (1) String obj= (String)msg; System.out.println("访问数据"+obj); // ctx.write(obj); // (1) ctx.flush(); // (2) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close theconnection when an exception is raised. cause.printStackTrace(); ctx.close(); } @Override public void channelActive(final ChannelHandlerContext ctx) { ctx.writeAndFlush("有客户端连接"); // (3) } } TcpClientHander [java] view plain copy print ? <span style="font-size:18px;">package com.bimatrix.revit.nettyTest; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TcpClientHandler extends ChannelHandlerAdapter { public TcpClientHandler() { } //private byte[] req; /** * 链路链接成功 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { /*for (int i = 0; i < 1000; i++) { ctx.writeAndFlush("1ac"); } */ // 链接成功后发送 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("client收到数据" +msg); // ctx.write("收到数据!"); // ctx.write(msg); // ctx.write("w2d"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } </span>
TcpClient
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.apache.log4j.Logger; public class TcpClient { private static final Logger logger = Logger.getLogger(TcpClient.class); public static String HOST = "127.0.0.1"; public static int PORT = 9999; public static Bootstrap bootstrap = getBootstrap(); public static Channel channel = getChannel(HOST,PORT); /** * 初始化Bootstrap * @return */ public static final Bootstrap getBootstrap(){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("handler", new TcpClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; } public static final Channel getChannel(String host,int port){ Channel channel = null; try { channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e); return null; } return channel; } public static void sendMsg(String msg) throws Exception { if(channel!=null){ channel.writeAndFlush(msg).sync(); }else{ logger.warn("消息发送失败,连接尚未建立!"); } } public static void main(String[] args) throws Exception { try { long t0 = System.nanoTime(); for (int i = 0; i < 100000; i++) { if(i%3==0){ TcpClient.sendMsg(i+"11aaa222aaa"); }else if(i%3==1){ TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb"); }else if(i%3==2){ TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc"); } } long t1 = System.nanoTime(); System.out.println((t1-t0)/1000000.0); } catch (Exception e) { e.printStackTrace(); } } }
上面代码部分下面这一块 主要测试消息发送是否会造成粘包 for (int i = 0; i < 100000; i++) { if(i%3==0){ TcpClient.sendMsg(i+"11aaa222aaa"); }else if(i%3==1){ TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb"); }else if(i%3==2){ TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc"); } }
1,使用LengthFieldPrepender编码,LengthFieldBasedFrameDecoder解码的netty传输
可以解决半包粘包
2 代码部分
tcpserver
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.apache.log4j.Logger; public class TcpServer { private static final Logger logger = Logger.getLogger(TcpServer.class); private static final String IP = "127.0.0.1"; private static final int PORT = 9999; /**用于分配处理业务线程的线程组个数 */ protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认 /** 业务出现线程大小*/ protected static final int BIZTHREADSIZE = 4; /* * NioEventLoopGroup实际上就是个线程池, * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件, * 每一个NioEventLoop负责处理m个Channel, * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel */ private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); protected static void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); // b.bind(IP, PORT).sync(); ChannelFuture f = b.bind(PORT).sync(); // (7) f.channel().closeFuture().sync(); logger.info("TCP服务器已启动"); } protected static void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { logger.info("开始启动TCP服务器..."); TcpServer.run(); // TcpServer.shutdown(); } }
TcpServerHandler
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TcpServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) ctx.write(msg); // (1) String obj= (String)msg; System.out.println("访问数据"+obj); // ctx.write(obj); // (1) ctx.flush(); // (2) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // Close theconnection when an exception is raised. cause.printStackTrace(); ctx.close(); } @Override public void channelActive(final ChannelHandlerContext ctx) { ctx.writeAndFlush("有客户端连接"); // (3) } } TcpClientHander [java] view plain copy print ? <span style="font-size:18px;">package com.bimatrix.revit.nettyTest; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TcpClientHandler extends ChannelHandlerAdapter { public TcpClientHandler() { } //private byte[] req; /** * 链路链接成功 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { /*for (int i = 0; i < 1000; i++) { ctx.writeAndFlush("1ac"); } */ // 链接成功后发送 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("client收到数据" +msg); // ctx.write("收到数据!"); // ctx.write(msg); // ctx.write("w2d"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } </span>
TcpClient
[java] view plain copy print ? package com.bimatrix.revit.nettyTest; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.apache.log4j.Logger; public class TcpClient { private static final Logger logger = Logger.getLogger(TcpClient.class); public static String HOST = "127.0.0.1"; public static int PORT = 9999; public static Bootstrap bootstrap = getBootstrap(); public static Channel channel = getChannel(HOST,PORT); /** * 初始化Bootstrap * @return */ public static final Bootstrap getBootstrap(){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("handler", new TcpClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; } public static final Channel getChannel(String host,int port){ Channel channel = null; try { channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { logger.error(String.format("连接Server(IP[%s],PORT[%s])失败", host,port),e); return null; } return channel; } public static void sendMsg(String msg) throws Exception { if(channel!=null){ channel.writeAndFlush(msg).sync(); }else{ logger.warn("消息发送失败,连接尚未建立!"); } } public static void main(String[] args) throws Exception { try { long t0 = System.nanoTime(); for (int i = 0; i < 100000; i++) { if(i%3==0){ TcpClient.sendMsg(i+"11aaa222aaa"); }else if(i%3==1){ TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb"); }else if(i%3==2){ TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc"); } } long t1 = System.nanoTime(); System.out.println((t1-t0)/1000000.0); } catch (Exception e) { e.printStackTrace(); } } }
上面代码部分下面这一块 主要测试消息发送是否会造成粘包 for (int i = 0; i < 100000; i++) { if(i%3==0){ TcpClient.sendMsg(i+"11aaa222aaa"); }else if(i%3==1){ TcpClient.sendMsg(i+"222cccdddbbbbbbbbbbbb"); }else if(i%3==2){ TcpClient.sendMsg(i+"333cccddda12ccccccccccccccccccccccccc"); } }