我们会通过Netty实现一个Echo服务器和客户端,客户端向服务器发送数据,服务器原样返回。 下图展示了这个Echo服务器的样子。
开始编写Echo Server: 所有的Netty 服务器都需要以下2个部分:
至少一个ChannelHandler - 实现了服务器处理接收到的数据Bootstrap - 这是能配置服务器的启动代码ChannelHandler和业务逻辑 因为Echo服务器会对输入的信息提供响应,它需要实现接口ChannelInboundHandler,这个接口为输入数据事件(inbound events)定义了处理函数。我们这个服务器只需要ChannelInboundHandler接口里面的一小部分方法,因此,只要继承ChannelInboundHandlerAdapter就够了,它提供了ChannelInboundHandler的一个默认实现。
我们感兴趣的是如下方法:
channelRead() - 只要有数据来就会触发channelReadComplete() - 通知handler,对channelRead()的最后一个调用是当前batch中的最后一个消息exceptionCaught() - 在读操作期间抛出了异常就会触发下面是Echo服务器的ChannelHandler实现类:
/** * @Sharable 表明这个ChannelHandler能被安全的被多个channels共享 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in);//将收到的数据写入ByteBuf中(后面会发送给发送者)而没有flush } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).//flush buffer中的消息到远程节点然后关闭channel addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close();//closes the channel } }如果一个异常没有被捕获会发生什么?每一个Channel都有一个与之相关的ChannelPipeline,ChannelPipeline持有一连串的ChannelHandler实例。一个hanlder会把handler方法的调用传递给(handler链中的)下一个handler,因此如果exceptionCaught()没有在链中的某个地方实现,接收到的异常就会传送到ChannelPipeline的末端,然后会被记录(log)。因此,你的应用应该提供至少一个实现了exceptionCaught()的ChannelHandler。
记住下面几点:
ChannelHandlers能被不同类型的事件调用应用程序实现或继承ChannelHandlers来介入事件的生命周期同时提供自定义的应用业务逻辑ChannelHandlers让你的业务逻辑代码与网络编程代码解耦启动(Bootstrapping)服务器 启动服务器会涉及到以下两点:
绑定服务器监听的端口然后接受(accept)连接请求配置channel来通知一个EchoServerHandler实例输入的信息下面给出EchoServer类的实现:
package com.netty.ch2; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port){ this.port = port; } public static void main(String[] args) throws Exception { if(args.length != 1){ args = new String[]{"8000"}; } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception{ final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class)//表示使用NIO transport Channel .localAddress(new InetSocketAddress(port))//绑定端口号 //当一个新连接进来,一个新的child channel会被创建 //ChannelInitializer会将EchoServerHandler的一个实例增加到pipeline .childHandler(new ChannelInitializer<SocketChannel>() { //增加一个EchoServerHandler到Channel的pipeline protected void initChannel(SocketChannel ch) throws Exception { //EchoServerHandler是@Sharable的,因此我们可以一直使用同一个 ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();//异步地绑定服务器,调用sync()将异步变成同步,同步地等待绑定完成 f.channel().closeFuture().sync();//得到channel的closeFuture同时阻塞当前的线程直到closeFuture完成 }finally { //关闭EventLoopGroup,释放所有的资源 group.shutdownGracefully().sync(); } } }回顾一下实现服务器的重要步骤:
EchoServerHandler实现了业务逻辑main()方法启动了服务器启动服务器的步骤是:
创建一个ServerBootstrap实例来启动和绑定服务器创建和分配一个NioEventLoopGroup实例来处理各种事件(如:新连接进入和读写数据等)指定服务器绑定的本地InetSocketAddress用EchoServerHandler实例来实例化每一个新的Channel调用ServerBootstrap.bind()方法来绑定服务器到此为止服务器已经被初始化了然后可以开始提供服务了。下面我们来实现客户端的代码
Echo客户端会:
连接服务器发送消息发送的每一条消息,等待相应的回复关闭连接 客户端也需要实现业务逻辑和启动(bootstrapping)代码通过ChannelHandlers实现客户端业务逻辑 客户端通过ChannelInboundHandler来处理数据。可以通过继承SimpleChannelInboundHandler来实现下面需要的方法:
channelActive() - 当与服务端的连接建立成功后被调用channelRead0() - 收到来自服务器的消息时调用exceptionCaught() - 如果处理的过程中抛出异常被调用 package com.netty.ch2; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //当知悉channel被激活后 发送一条信息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }首先,重写了channelActive(),当连接建立后会调用这个方法。 接下来重写了channelRead0()方法,当收到的数据后会回调这个方法。注意可能以块(chunks)的形式接收服务器发送来的数据。如果服务器发送了5字节的数据,但是无法保证所有的5个字节会被一次接收。甚至这么小的数据(才5字节)都可能导致这个方法被调用两次,第一次一个ByteBuf(Netty的字节容器)装载了3字节的数据,第二次装载了2字节的数据。因为面向流的协议,比如TCP,能保证这些字节以它们被(服务器)发送的顺序接收。
SimpleChannelInboundHandler vs. ChannelInboundHandler 在客户端(客户端继承SimpleChannelInboundHandler),当channelRead0()方法完成时,你已经取得了要接收的消息,当这个方法返回时,SimpleChannelInboundHandler会释放持有消息的ByteBuf所引用的相关内存;而在服务端,你仍然需要通过异步的write()方法将受到的消息返回给发送者,这个write()方法在channelRead()方法返回前可能还没有将消息写完,此时不应该释放这个消息所占的资源。
启动客户端
package com.netty.ch2; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; /** * Created by greyfoss on 2017/9/1. */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group)//指定EventLoopGroup来处理客户端消息 .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host,port)) //当Channel创建时将EchoClientHandler增加到pipeline .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if(args.length != 2){ args = new String []{"localhost","8000"}; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host,port).start(); } }像服务端一样,使用NIO transport。 回顾一下创建客户端的要点:
创建一个Bootstrap实例来初始化客户端分配NioEventLoopGroup实例来处理事件(包括创建新连接,处理读写数据)为connnection创建一个InetSocketAddress实例来连接服务器当连接建立好后安装一个EchoClientHandler到pipeline接下来调用Bootstrap.connect()来连接到远程的服务器