使用netty进行客户端网络编程及实现断线重连功能

xiaoxiao2021-02-28  18

最近使用netty搭建了一个服务端和一个客户端网络通信的demo,记录一下,不多说,先上项目结构图

当时maven有点问题,所以直接引入的jar包,反正也只有一个。(ClientHandler和ServerHandler类分别用HeartBeatClientHandler和HeartBeatServerHandler代替)

搭建服务端之前还有一些事情要做,对,就是自定义协议,还有编码解码

这部分是参考了网上的一些资料

A  首先是协议部分

    1 新建LuckHeader.java 

package luck; public class LuckHeader { // 协议版本 private int version; // 消息内容长度 private int contentLength; // 服务名称 private String sessionId; public LuckHeader(int version, int contentLength, String sessionId) { this.version = version; this.contentLength = contentLength; this.sessionId = sessionId; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this.contentLength = contentLength; } public String getSessionId() { return sessionId; } public void setSessionId(String sessionId) { this.sessionId = sessionId; } }

这个属于协议头部分,下面是消息内容部分

    2 新建LuckMessage.java

package luck; public class LuckMessage { private LuckHeader luckHeader; private String content; public LuckMessage(LuckHeader luckHeader, String content) { this.luckHeader = luckHeader; this.content = content; } public LuckHeader getLuckHeader() { return luckHeader; } public void setLuckHeader(LuckHeader luckHeader) { this.luckHeader = luckHeader; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]", luckHeader.getVersion(), luckHeader.getContentLength(), luckHeader.getSessionId(), content); } }

到这里一个简单的luck协议的消息的模板就做好了.

B 接下来是编码解码部分

    1 新建LuckDecoder.java

package encode; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import luck.LuckHeader; import luck.LuckMessage; public class LuckDecoder extends ByteToMessageDecoder{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //获取协议版本 int version = in.readInt(); //获取消息长度 int contentLength = in.readInt(); //获取sessionid byte[] sessionByte = new byte[36]; in.readBytes(sessionByte); String sessionid = new String(sessionByte); //组装协议头 LuckHeader hearder = new LuckHeader(version, contentLength, sessionid); //读取消息内容 byte[] content = in.readBytes(in.readableBytes()).array(); LuckMessage message = new LuckMessage(hearder, new String(content)); out.add(message); } }

2 新建LuckEncoder.java

package encode; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import luck.LuckHeader; import luck.LuckMessage; public class LuckEncoder extends MessageToByteEncoder<LuckMessage>{ @Override protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf in) throws Exception { //将message转换成二进制数据 LuckHeader header = message.getLuckHeader(); //写入的顺序就是协议的顺序 //标题头信息 in.writeInt(header.getVersion()); in.writeInt(message.getContent().length()); in.writeBytes(header.getSessionId().getBytes()); //主题信息 in.writeBytes(message.getContent().getBytes()); } }

注意一下 ,这2个类的编码解码的顺序要一致才可以,不然会报错

  好,准备工作做完了,接下来重头戏就是搭建服务端了

  新建Server.java

package server; import java.util.concurrent.TimeUnit; import encode.LuckDecoder; import encode.LuckEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; /** * Netty通信的步骤: ①创建两个NIO线程组,一个专门用于网络事件处理(接受客户端的连接),另一个则进行网络通信的读写。 ②创建一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传出数据的缓存大小等。 ③创建一个用于实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式以及实际处理数据的接口。 ④绑定端口,执行同步阻塞方法等待服务器端启动即可。 * @author Administrator * */ public class Server { private int port; public Server(int port){ this.port = port; } public void run(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于处理服务器端接受客户端 EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行网络通信(读写) try{ ServerBootstrap bootstrap = new ServerBootstrap();//辅助工具类,用于服务器通道的一系列配置 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { //配置具体的数据处理方式                      @Override              protected void initChannel(SocketChannel socketChannel) throws Exception {              ChannelPipeline pipeline = socketChannel.pipeline();          pipeline.addLast(new IdleStateHandler(5, 0, 0,TimeUnit.SECONDS));          pipeline.addLast(new LuckEncoder());          pipeline.addLast(new LuckDecoder());          //处理事件的类          //pipeline.addLast(new ServerHandler());          pipeline.addLast(new HeartBeatServerHandler());                  }                  }) .option(ChannelOption.SO_BACKLOG, 128)//设置tcp缓冲区(// 保持连接数 ) .option(ChannelOption.SO_SNDBUF, 32*1024)//设置发送数据缓冲大小 .option(ChannelOption.SO_RCVBUF, 32*1024)//设置接受数据缓冲大小 .childOption(ChannelOption.SO_KEEPALIVE , true)//保持连接 .option(ChannelOption.TCP_NODELAY, true);//有数据立即发送 ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); }catch(Exception e){ e.printStackTrace(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { new Server(8077).run(); } }

服务端有2个线程池,一个处理客户端的连接,一个处理io任务,ServerBootstrap对象用于启动NIO服务端的辅助启动类(目的是降低服务端的开发复杂度),IdleStateHandler类是服务端监听客户端心跳需要配置的handler,处理事件的类由HeartBeatServerHandler处理,然后由bootstrap绑定监听一个端口,监听客户端的连接

实现断线的类HeartBeatServerHandler.java,瞧好咯

package server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import luck.LuckMessage; public class HeartBeatServerHandler extends SimpleChannelInboundHandler<LuckMessage>{ //连接失败次数 private int connectTime = 0; //定义最大未连接次数 private static final int MAX_UN_CON_TIME = 3; @Override protected void messageReceived(ChannelHandlerContext ctx, LuckMessage msg) throws Exception { try{ LuckMessage lmsg = (LuckMessage) msg; if (lmsg.getContent().equals("h")) { //心跳消息 //吧连接失败次数清0 System.out.println("Server接收到心跳消息 ,失败次数清0:" + msg.toString()); connectTime = 0 ; } else { System.out.println("Server接收到普通消息 :" + msg.toString()); } }catch(Exception e){ e.printStackTrace(); }finally { ReferenceCountUtil.release(msg); } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt){ if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { //读超时 System.out.println("服务端读超时=====连接失败次数" + connectTime); if (connectTime >= MAX_UN_CON_TIME) { System.out.println("服务端关闭channel"); ctx.channel().close(); connectTime = 0; } else { connectTime ++; } }else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ System.out.println("服务端写超时"); } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println("服务端全部超时"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // cause.printStackTrace(); System.out.println("服务端发生错误:" + cause.getMessage()); ctx.channel().close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("有客户端连接"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 关闭,等待重连 ctx.close(); System.out.println("===服务端===(客户端失效)"); } }

messageReceived方法是接受到消息的时候调用的,这个里面我搞了一些骚操作,可以看可不看,主要是接受到的消息是否为心跳消息,是的话,把连接失败次数connectTime清0;其他方法 里面都有注释。

重点是userEventTriggered方法,IdleState定义了几个事件

读事件 READER_IDLE

写事件 WRITER_IDLE

全部事件 ALL_IDLE

这个很好理解  就是字面意思,这里用读事件举得例子,也就是服务端没有收到客户端的消息(读到客户端的消息)的时候就触发该事件,当超过三次没有收到客户端的消息的时候 就断开与改客户端的连接ctx.channel().close();一个连接相当于一个channel。

服务端配置好了  接下来配置客户端

新建Client.java

package client; import java.net.InetSocketAddress; import java.util.Random; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.TimeUnit; import encode.LuckDecoder; import encode.LuckEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import luck.LuckHeader; import luck.LuckMessage; public class Client { private static Channel channel; private static Bootstrap bootstrap; private static ChannelFutureListener channelFutureListener = null; public static void main(String[] args) throws Exception { new Client(); } public Client() throws Exception{ System.out.println("构造方法"); init(); sendData(); } static Scanner in = new Scanner(System.in); public static void sendData() throws Exception { System.out.println("senddata"); //组装协议信息 int version = 1; String sessionId = UUID.randomUUID().toString(); String content = ""; LuckHeader header = new LuckHeader(version, content.length(), sessionId); LuckMessage message = null; do { content = in.nextLine(); message = new LuckMessage(header, content); channel.writeAndFlush(message); } while (!content.equals("q")); } public void init() throws InterruptedException{ EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)); pipeline.addLast("encoder",new LuckEncoder()); pipeline.addLast("decoder",new LuckDecoder()); // pipeline.addLast(new ClientHandler()); pipeline.addLast(new HeartBeatClientHanlder()); } }); //设置tcp协议的属性 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_TIMEOUT, 5000); connect(); }catch(Exception e){ e.printStackTrace(); }finally { // workerGroup.shutdownGracefully(); } } /** * 重新连接服务器 * @throws InterruptedException */ public static void connect(){ if (channel != null && channel.isActive()) { return; } System.out.println("连接中"); ChannelFuture channelFuture = null; try { channelFuture = bootstrap.connect("192.168.1.12", 8077).sync(); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture futureListener) throws Exception { if (futureListener.isSuccess()) { channel = futureListener.channel(); System.out.println("连接成功"); } else { System.out.println("连接失败"); } } }); } catch (Exception e) { e.printStackTrace(); } } }

与服务端不同的是,客户端只有一个NioEventLoopGroup,其他配置都是大同小异,

说明一下 static Scanner in = new Scanner(System.in);这句话定义为类的静态变量是为了保证,如果在服务器切断了改客户端的连接,并且重连了以后,再进入到sendData方法中只有一个Scanner ;如果定义在sendData方法里面,重连的时候就要发送2次消息才能让服务端接收到,而且是作为一条消息发过去的  ,2条消息中间还会有乱码。重连几次就需要几条消息一起 才能发过去

ok,现在都写好了   运行试一下  

先启动服务端Server.java然后启动客户端Client.java

然后客户端不发送消息知道服务端断开连接

此时客户端的channelInactive方法会调用connect方法,重新连接

到这里断线重连的简单例子就写好了  欢迎各位大牛指点

转载请注明原文地址: https://www.6miu.com/read-2799981.html

最新回复(0)