带你玩转Netty(二)

xiaoxiao2021-02-28  118

概述

上篇给大家介绍了 Netty 的基本原理,这次主要给大家讲一下具体怎么用 Netty,主要分成两个方面客户端以及服务端实现,我们将实现一个简单的 Echo 程序。

服务端

服务端的实现,主要从这几个方面考虑: 1. 最佳线程模型,实现高并发,高稳定性 2. 容错机制 3. 业务处理 4. 心跳监测

那么用Netty怎么一一实现这些呢,废话不多说,直接上代码。

public class NormalNettyServer { private int serverPort = 9000; private String serverIp = "192.168.2.102"; public NormalNettyServer(int port){ serverPort = port; } public void start() throws Exception { // 创建Accpet线程池 (1) EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建Work线程池 EventLoopGroup workGroup = new NioEventLoopGroup(10); try{ // 创建ServerBootstrap (2) ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup). // (3) channel(NioServerSocketChannel.class). //(4) childHandler(new ChannelInitializer<SocketChannel>() { // 初始化处理handler (5) @Override public void initChannel(SocketChannel ch) throws Exception { // 加入用户心跳监测机制 读时间超时 60s 写时间超时 10s 读写都没有超时 10s ch.pipeline().addLast("timeout", new IdleStateHandler(60, 10, 10, TimeUnit.SECONDS)); // 加入业务处理handler ch.pipeline().addLast("echo", new EchoHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) // (6) .childOption(ChannelOption.SO_KEEPALIVE, true); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(serverIp, serverPort).sync(); // (7) f.channel().closeFuture().sync(); } finally { // 释放资源 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } EventLoopGroup 是用来处理I/O操作多线程事件循环池, 我们知道Netty是基于Reactor模型的,其中一种是双线程池,这里就创建了两个线程循环池,管理10个线程,bossGroup主要用于处理accpet请求,建立连接后的处理主要是由workGroup负责。ServerBootstrap 是一个启动NIO服务的辅助启动类注册两个线程循环池。

NioServerSocketChannel这里直接引用官方说明更合适

A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.

SocketChannel是TCP连接的网络通道,在下面两种情况会创建

打开一个SocketChannel连接某台服务器。一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。 当通道建立,会调用初始化操作,将业务处理的handler加入到pipeline中。这里主要加入应用层心跳监测以及应用层业务处理通道支持参数配置,这里配置了两个参数SO_BACKLOG, SO_KEEPALIVE,具体作用大家可以看这里绑定host以及监听端口,返回的ChannelFuture, 由于这个过程是异步的,所有执行状态可以通过ChannelFuture中的获取,这将在客户端的实现中重点介绍。

通过以上步骤,我们就可以建立一个高效的服务,对于一个写C++的我,只能说真的很爽。言归正传,下面我们介绍一下两个handler的处理,看代码。

public class EchoHandler extends SimpleChannelInboundHandler<Object> { // (1) @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { ByteBuf buf = (ByteBuf)o; // (2) byte[] packet = new byte[buf.readableBytes()]; buf.readBytes(packet); // pb 协议 (3) HelloTest.Hello hello = HelloTest.Hello.parseFrom(packet); System.out.println(hello.getContent().toString()); channelHandlerContext.channel().writeAndFlush(buf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // (4) cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 新连接active super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 连接关闭 super.channelInactive(ctx); } Netty的接受数据处理Handler都是继承 SimpleChannelInboundHandler或者ChannelInboundHandlerAdapterNetty数据的收发,都采用ByteBuf这里采用了PB协议,具体用法,这里不详细讲,大家先看看,之后会有文章介绍。因为是Echo, 所有收到的数据直接原包返回。Channel异常处理,这里可以做一些容错操作。 public class HeartBeatHandler extends ChannelInboundHandlerAdapter { private ConnectionClient client; public HeartBeatHandler(ConnectionClient client) { this.client = client; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.WRITER_IDLE) { //TODO(1) 可以做监测处理 } else if (event.state() == IdleState.READER_IDLE) { // TODO(2) } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } } 写数据超时,服务端已经有一段时间没有对该连接通道发送数据。读数据超时,服务端已经有一段时间没有接受该连接通道的数据。通过这两个,我们可以对该通道做检查,比如发送心跳指令,监测客户端是否还存在。

这就是最简单服务端实现,麻雀虽小,五脏俱全,实现了我们最开始说的几个方面的内容。按照这个顺序,我们接下来介绍客户端实现。

客户端

客户端的实现往往都会有这样几个要求: * 断线重连 * 心跳维持 * 业务处理

接下来,我们将会用Netty来实现这些功能,由于代码比较多,下面主要截取重要函数做说明。

public boolean connect() { Bootstrap b = new Bootstrap(); // (1) final HeartBeatHandler hearthandler = new HeartBeatHandler(this); final ClientHandler handler = new ClientHandler(this); EventLoopGroup loop = new NioEventLoopGroup(); // (2) b.group(loop).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // idle状态处理,主要是用于与服务端发送心跳 (3) pipeline.addLast(new IdleStateHandler(60, 20, 0, TimeUnit.SECONDS)); pipeline.addLast("hearthandler", hearthandler); // 业务处理 pipeline.addLast("handler", handler); } }); b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.TCP_NODELAY, true); ChannelFuture future = b.connect(host, port); future.addListener(new ConnectionListener(this)); // (4) return true; } Bootstrap客户端启动辅助类,与ServerBootstrap相对。客户端不需要处理连接请求,所有只需定义一个多线程事件循环池来处理channel事件就可以。客户端的业务处理handler,包含心跳处理,以及业务处理。

ChannelFuture添加连接情况监听,用于实现重连。

其他的含义与服务端相同,请参考服务端说明,这里重点介绍重连机制的实现,重连主要是由两个点来触发的,一个是ChannelFuture, 一个Handler的InActive事件。

public class ConnectionListener implements ChannelFutureListener { // 此处省略多行代码 ....... @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { // 连接失败 System.out.println("connect reconnect"); this.client.reconnect(future.channel()); } else { // 连接成功 System.out.println("connect success"); this.client.setChannel(future.channel()); } } } public class ClientHandler extends ChannelInboundHandlerAdapter { // 此处省略多行代码 ....... @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 断开连接 System.out.println("SuperServer is disconnect " + ctx.channel().remoteAddress().toString()); client.reconnect(ctx.channel()); super.channelInactive(ctx); } } public void reconnect(final Channel ch) { final EventLoop eventLoop = ch.eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { connect(); System.out.println("reconnect server:" + host + ", Port:" + port); } }, 10L, TimeUnit.SECONDS); }

以上就是具体的重连的实现过程,两个触发点,大家记住即可。另外心跳的实现与服务端类似,只不过在业务的处理有可能不同。比如 服务要考虑是否断开客户端,回收资源。而客户端要考虑的是是否要重连。

欢迎关注公众号

转载请注明原文地址: https://www.6miu.com/read-32589.html

最新回复(0)