本例实现功能为客户端获取服务器的时间并显示,参考《Netty权威指南》一书,不过书中使用Netty5实现的,考虑到Netty已经下架5,本例子使用Netty4.1.6版本。 首先服务器端,包括TimeServer、TimeServerHandler两个类: TimeServer
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); //绑定端口, 同步等待成功; ChannelFuture future = bootstrap.bind(port).sync(); //等待服务端监听端口关闭 future.channel().closeFuture().sync(); } finally { //优雅关闭 线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("timeServerHandler",new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 443; new TimeServer().bind(port); } }TimeServerHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TimeServerHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server start read"); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "Query Time Order".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "Bad Order"; //异步发送应答消息给客户端: 这里并没有把消息直接写入SocketChannel,而是放入发送缓冲数组中 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }客户端,包括TimeClient和TimeClientHandler两个类: TimeClient
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port, String host) throws Exception{ //配置客户端NIO 线程组 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap client = new Bootstrap(); try { client.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); //绑定端口, 异步连接操作 ChannelFuture future = client.connect(host, port).sync(); //等待客户端连接端口关闭 future.channel().closeFuture().sync(); } finally { //优雅关闭 线程组 group.shutdownGracefully(); } } public static void main(String[] args) { int port = 443; TimeClient client = new TimeClient(); try { client.connect(port, "127.0.0.1"); } catch (Exception e) { e.printStackTrace(); } } }TimeClientHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class TimeClientHandler extends ChannelInboundHandlerAdapter{ private final ByteBuf firstMSG; public TimeClientHandler() { byte[] req = "QUERY TIME ORDER".getBytes(); firstMSG = Unpooled.buffer(req.length); firstMSG.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(firstMSG); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("NOW is: " + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }启动顺序,先启动服务端,再启动客户端,然后再服务器和客户端分别达到的输出如下: 服务端: Server start read The time server receive order : QUERY TIME ORDER 客户端: NOW is: Thu Jun 08 11:08:06 CST 2017