nio实战之netty实现

xiaoxiao2021-02-28  49

netty 简介

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,它相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 Netty 吸收了多种协议(包括FTP、SMTP、HTTP,各种二进制,文本协议)的实现经验,在保证易于开发的同时还保证了其应用的性能、稳定性和伸缩性。

netty使用示例

服务端代码

public class TimeServer { public static void main(String[] args) throws Exception { new TimeServer().bind(8086); } public void bind(int port) throws Exception { //配置服务端的nio线程组 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); //绑定端口,同步等待成功 ChannelFuture f=b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); }finally { { //释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel channel) throws Exception { // channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("\n".getBytes()))); channel.pipeline().addLast(new FixedLengthFrameDecoder(17)); channel.pipeline().addLast(new StringDecoder()); // channel.pipeline().addLast(new TimeServerHandler()); channel.pipeline().addLast(new ChannelHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); byte[] result=("server:"+msg+System.getProperty("line.separator")).getBytes(); ByteBuf buf= Unpooled.copiedBuffer(result); ctx.writeAndFlush(buf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }); } } }

客户端代码

public class TimeClient { public static void main(String[] args) throws InterruptedException { new TimeClient().connect(8086,"127.0.0.1"); } public void connect(int port,String host) throws InterruptedException { //配置客户端nio线程组 EventLoopGroup group=new NioEventLoopGroup(); try{ Bootstrap b=new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer("\n".getBytes()))); channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new ChannelHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("enter"); BufferedReader br=new BufferedReader(new InputStreamReader(System.in)); String line=br.readLine(); System.out.println("enter1:"+line); while(!"EOF".equals(line)){ byte[] bytes=(line+System.getProperty("line.separator")).getBytes(); ByteBuf buf=Unpooled.buffer(bytes.length); buf.writeBytes(bytes); ctx.writeAndFlush(buf); line=br.readLine(); System.out.println("enter1:"+line); } // byte[] bytes=("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); // ByteBuf buf=Unpooled.buffer(bytes.length); // buf.writeBytes(bytes); // ctx.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception { System.out.println(msg); } }); } }); //发起异步连接操作 ChannelFuture f=b.connect(host,port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); }finally { //释放nio线程组 group.shutdownGracefully(); } } }

客户端做的事情是读取用户的输入,然后发送给服务端;由于FixedLengthFrameDecoder这个handler,必须是17个字符,才能发送成功。 由示例可以看出,我们能够做的和需要做的就是自定义内容处理器。这就像一个服务,比如tomcat服务,容器本身我们是无法改变的,我们能做的就是根据请求提供不同的响应。 示例中也有许多固定的代码,这些是服务的配置项,就像tomcat,我们也能通过不同的配置改变它的行为、表现。

看到这里,你可能还是懵的,不知道netty为我们做了哪些工作,现在我们再来看一下直接使用nio的代码。 服务端代码:

public class NIOServer { private int flag =0; //读写缓冲区大小 private int BLOCK=4096; //发送缓冲 private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK); //接收缓冲 private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK); private Selector selector; public NIOServer(int port)throws IOException { ServerSocketChannel serverSocketChannel= ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket serverSocket=serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port)); selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server start --- "+port); } /** * 监听 * @throws IOException */ private void listen() throws IOException{ while(true){ selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> itr=selectionKeys.iterator(); while(itr.hasNext()){ SelectionKey selectionKey=itr.next(); itr.remove(); handleKey(selectionKey);//处理 selectionKey } } } /** * 处理 selectionKey,读写 * @param selectionKey * @throws IOException */ private void handleKey(SelectionKey selectionKey)throws IOException{ SocketChannel client=null; if(selectionKey.isAcceptable()){ ServerSocketChannel server=(ServerSocketChannel)selectionKey.channel(); client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_READ); }else if(selectionKey.isReadable()){ client=(SocketChannel)selectionKey.channel(); receiveBuffer.clear(); int count=client.read(receiveBuffer); if(count>0){ String receiveText=new String(receiveBuffer.array(),0,count); System.out.println("服务端接收客户端数据 -- :"+receiveText); client.register(selector,SelectionKey.OP_WRITE);//注册写操作 } }else if(selectionKey.isWritable()){ client=(SocketChannel) selectionKey.channel(); String sendText="message from server--"+flag++; sendBuffer.clear(); sendBuffer.put(sendText.getBytes()); sendBuffer.flip(); client.write(sendBuffer); System.out.println("服务端向客户端发送数据--: "+sendText); client.register(selector,SelectionKey.OP_READ); //注册读操作 } } public static void main(String[] args) throws IOException{ int port = 8888; NIOServer server=new NIOServer(port); server.listen(); } }

客户端代码:

public class NIOClient { private int BLOCK = 4096; private ByteBuffer sendBuffer=ByteBuffer.allocate(BLOCK); private ByteBuffer receiveBuffer=ByteBuffer.allocate(BLOCK); private Selector selector; public NIOClient(String host,int port) throws IOException{ SocketChannel channel=SocketChannel.open(); channel.configureBlocking(false); selector=Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); channel.connect(new InetSocketAddress(host, port)); } /** * 连接服务端,并进行请求 * @throws IOException */ public void connect()throws IOException{ while(true){ selector.select(); handleKey(selector.selectedKeys()); selector.selectedKeys().clear(); } } /** * 对select到的key进行处理 * @param selectionKeys * @throws IOException */ public void handleKey(Set<SelectionKey> selectionKeys) throws IOException { Iterator<SelectionKey> itr=selectionKeys.iterator(); while(itr.hasNext()){ SelectionKey selectionKey=itr.next(); SocketChannel client=(SocketChannel)selectionKey.channel(); if(selectionKey.isConnectable()){ System.out.println("client connect"); if(client.isConnectionPending()){ client.finishConnect(); System.out.println("完成连接!"); //首次写数据 sendBuffer.clear(); sendBuffer.put("Hello,Server".getBytes()); sendBuffer.flip(); client.write(sendBuffer); } client.register(selector,SelectionKey.OP_READ); }else if(selectionKey.isReadable()){ receiveBuffer.clear(); //读取数据 int count=client.read(receiveBuffer); if(count>0){ String receiveText=new String(receiveBuffer.array(),0,count); System.out.println("客户端接受服务端数据--:"+receiveText); client.register(selector,SelectionKey.OP_WRITE); } } } selectionKeys.clear(); } public void write(String content) throws IOException { out:while(true){ selector.select(); Set<SelectionKey> selectionKeys=selector.selectedKeys(); Iterator<SelectionKey> itr=selectionKeys.iterator(); while(itr.hasNext()){ SelectionKey selectionKey=itr.next(); if(selectionKey.isWritable()){ itr.remove(); ByteBuffer sendBuffer=ByteBuffer.allocate(BLOCK); sendBuffer.put(content.getBytes()); sendBuffer.flip(); SocketChannel client=(SocketChannel)selectionKey.channel(); //将缓冲区各标志位复位,因为向里面put了数据,标志被改变,想从中读取数据发向服务端,就要复位 sendBuffer.flip(); client.write(sendBuffer); System.out.println("客户端向服务端发送数据 --:"+content); break out; } } } } public static void main(String[] args) throws IOException { final NIOClient client=new NIOClient("127.0.0.1",8888); new Thread(new Runnable() { public void run() { try { client.connect(); } catch (IOException e) { e.printStackTrace(); } } }).start(); client.write("test1testtest"); try { Thread.sleep(100000l); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("dd"); //client.write("test2testtest"); } }

相比较上面的代码,直接使用nio就多了许多细节,许多直接与io交互的代码,比如连接事件、读事件、写事件;还有读写模型、编解码的流程设计,这些也都需要自己去处理。 看到这里,你应该就明白了,netty就是对nio的一层封装,提供了一套模板,我们通过配置就可以得到不同的线程模型,权衡好吞吐量、响应度等指标。

reactor 线程模型

有单线程、多线程、主从reactor三种模型 netty通过配置可以灵活使用这三种模型;推荐使用第三种模型,适合高并发、高负载情况。

小编开始也很疑惑为什么要多一个subReactor,但是请仔细看,主从reactor模型中的acceptor是单向的,而普通reactor模型是双向的,这说明主从reactor中mainReactor只做接收请求以及分发的事情,接收到之后的事情都扔给subReactor处理。比如监听多个端口的情形,每个subReactor监听一个端口。

netty框架实现(单reactor多线程,长连接版)

下面谈一下netty框架的实现,主要针对主reactor多线程这个模型作为示例。 其流程如下:

下面贴代码,这个例子是服务端和客户端可以随时进行自由读写的长连接示例(主要部分):

公共模板类:

/** * nio 操作模板类 * server 与 client 的基类 * Created by guzy on 16/9/18. */ public abstract class NioTemplate { Logger logger=Logger.getLogger(NioTemplate.class); protected Selector selector; /** * 主 channel */ SelectableChannel channel; //TODO:可以考虑拿掉 /** * 内容处理链 */ private List<ContentHandler> contentHandlers=new ArrayList<ContentHandler>(); /** * 是否需要继续运行 */ protected volatile Boolean running=true; /** * 展示名称 */ String name; //worker 线程,用来处理数据内容 private ExecutorService executorService; public NioTemplate(String name){ this.name=name; executorService= Executors.newFixedThreadPool(10); } public NioTemplate addContentHandler(ContentHandler contentHandler){ contentHandlers.add(contentHandler); return this; } /** * 客户端注册(登录)事件处理 * @param selectionKey * @param results */ abstract void handleFirstConnect(SelectionKey selectionKey,List<Object> results); /** * 处理关闭事件 * @param selectionKey */ abstract void handleClose(SelectionKey selectionKey); /** * 写掉待写的数据 */ abstract void handleNotWritten(); abstract void shutDown(); abstract void handleKey(SelectionKey selectionKey) throws IOException; /** * 启动方法 * @throws IOException */ public void start(){ if(!running){ return; } try{ while(running){ //registerSelectionKey();//注册写兴趣 handleNotWritten(); int count=selector.select(); if(count>0){ final Set<SelectionKey> selectionKeys= selector.selectedKeys(); for(final SelectionKey selectionKey:selectionKeys){ handleKey(selectionKey); } selectionKeys.clear(); } } }catch (Exception e){ e.printStackTrace(); }finally { shutDown(); } } /** * 读事件的处理 * @param selectionKey * @throws IOException */ void handleReadable(final SelectionKey selectionKey) throws IOException { //TODO:扩容,并发 final ByteBuffer receiveBuffer = selectionKey.attachment()==null?ByteBuffer.allocate(1024):(ByteBuffer)selectionKey.attachment(); final SocketChannel channel = (SocketChannel) selectionKey.channel(); //读取数据 int count = channel.read(receiveBuffer); if (count > 0) { executorService.execute(new Runnable() { public void run() { receiveBuffer.flip(); List<Object> results = new ArrayList<Object>(); results.add(receiveBuffer); for (ContentHandler handler : contentHandlers) { List<Object> outs = new ArrayList<Object>(); Iterator resultItr = results.iterator(); while (resultItr.hasNext()) { Object curResult = resultItr.next(); handler.read(channel, curResult, outs); } results = outs; } handleFirstConnect(selectionKey,results); for (Object curResult : results) { logger.debug(name + "接收数据:" + new String((byte[]) curResult)); } } }); channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ); //TODO:可能要改成在这里注册写事件 } else if (count < 0) { //对端链路关闭 selectionKey.cancel(); channel.close(); handleClose(selectionKey); } else { //读到0字节,忽略 } } /** * 写内容 * @param channel * @param content * @throws IOException */ void writeContent(final ByteBuffer attach,final SocketChannel channel,final Object content) { executorService.submit(new Callable<List<Object>>() { List<Object> results = new ArrayList<Object>(); public List<Object> call() throws IOException { results.add(content); logger.debug(name + "发送:" + content); for (ContentHandler handler : contentHandlers) { List<Object> outs = new ArrayList<Object>(); for (Object result : results) { handler.write(attach,channel, result, outs); } results = outs; } if(attach!=null){//此时都已经直接写到attach中了 writeContent(channel,attach); }else{ for(Object result:results){ writeContent(channel,(ByteBuffer)result); } } return results; } }); } /** * 底层的写内容方法 * @param socketChannel * @param sendBuffer * @throws IOException */ void writeContent(SocketChannel socketChannel,ByteBuffer sendBuffer) throws IOException { sendBuffer.flip(); while(sendBuffer.hasRemaining()){ socketChannel.write(sendBuffer); } } }

服务类:

/** * 服务端公共类 * Created by guzy on 16/9/20. */ public class NioServer extends NioTemplate { Logger logger=Logger.getLogger(NioServer.class); private SelectionKeys selectionKeys=new SelectionKeys(); private SocketChannelArr socketChannelArr=new SocketChannelArr(); /** * 主机-昵称的对应关系 */ ConcurrentHashMap<String,Queue<Object>> hostNickMap=new ConcurrentHashMap<String, Queue<Object>>(); /** * 要通过selectionKey 昵称 写的内容 */ ConcurrentHashMap<String,Queue<Object>> toWriteMap=new ConcurrentHashMap<String, Queue<Object>>(); /** * 要通过channel host 写的内容 */ ConcurrentHashMap<String,Queue<Object>> toWriteMap4Cha=new ConcurrentHashMap<String, Queue<Object>>(); public NioServer(int port, String name) throws IOException { super(name); logger.debug(String.format("serverName:%s",name)); ServerSocketChannel serverSocketChannel= ServerSocketChannel.open(); channel=serverSocketChannel; serverSocketChannel.configureBlocking(false); ServerSocket serverSocket=serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port)); selector= Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); logger.debug("server start --- " + port); } @Override void handleKey(final SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()) { try { handleConnect(selectionKey); } catch (IOException e) { logger.error(e); e.printStackTrace(); } return; } if (selectionKey.isReadable()) { handleReadable(selectionKey); } } void handleFirstConnect(SelectionKey selectionKey,List<Object> results){ logger.debug("enter handleFirst"); if(selectionKeys.containsValue(selectionKey)){ logger.debug("already has selectionKey"); return; } selectionKeys.addSelectionKey(CommonUtils.getSocketName((SocketChannel)selectionKey.channel()),selectionKey); for(Object result:results){ String res=new String((byte[])result); if(res.startsWith("MyName:")){ String name=res.substring(7); addObjToMap(CommonUtils.getSocketName((SocketChannel)selectionKey.channel()),name,hostNickMap); logger.debug("client name:"+name); selectionKeys.addSelectionKey(name,selectionKey); return;//?? } } } /** * 处理关闭事件 * @param selectionKey */ void handleClose(SelectionKey selectionKey){ SocketChannel socketChannel=(SocketChannel)selectionKey.channel(); logger.debug(String.format("before close:%s",socketChannelArr.getMap())); socketChannelArr.remove(socketChannel); selectionKeys.remove(CommonUtils.getSocketName((SocketChannel)selectionKey.channel())); logger.debug(String.format("after close:%s",socketChannelArr.getMap())); String name=CommonUtils.getSocketName(socketChannel); logger.debug(String.format("%s close",name)); Queue nicks=hostNickMap.get(name); if(nicks!=null && nicks.size()>0){ logger.debug(String.format("before selectionKeys:%s",selectionKeys.getMap())); for(Object nick:nicks){ if(selectionKeys.getSelectionKey((String)nick).equals(selectionKey)){ selectionKeys.remove((String)nick); } } logger.debug(String.format("after selectionKeys:%s",selectionKeys.getMap())); } } // // void handleClose(SelectionKey selectionKey){ // // SocketChannel socketChannel=(SocketChannel)selectionKey.channel(); // logger.debug(String.format("before close:%d",socketChannelArr.size())); // socketChannelArr.remove(socketChannel); // logger.debug(String.format("after close:%d",socketChannelArr.size())); // // String name=CommonUtils.getSocketName(socketChannel); // logger.debug(String.format("%s close",name)); // Queue nicks=hostNickMap.get(name); // if(nicks!=null && nicks.size()>0){ // logger.debug(String.format("before selectionKeys:%s",selectionKeys.getMap())); // for(Object nick:nicks){ // selectionKeys.remove((String)nick); // } // logger.debug(String.format("after selectionKeys:%s",selectionKeys.getMap())); // } // } void handleConnect(SelectionKey selectionKey) throws IOException { ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); final SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE); ByteBuffer attach=ByteBuffer.allocate(1024); selectionKey.attach(attach); // channels.addChannel(client); // final String clientName=client.socket().getInetAddress().getHostName(); socketChannelArr.add(client); // selectionKey.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); //selectionKeys.addSelectionKey(clientName,selectionKey); //write(clientName, "test server"); writeContent(attach,client,"服务端连接反馈!"); // channel.register(selector, SelectionKey.OP_WRITE); } // Map<String,List<Object>> filterAndCreateMap(Set<SelectionKey> selectionKeys){ // Map<String,List<Object>> resultMap=new HashMap<String, List<Object>>(); // for(SelectionKey selectionKey:selectionKeys){ // if(selectionKey.isWritable()){ // SocketChannel socketChannel=(SocketChannel)selectionKey.channel(); // String clientName=socketChannel.socket().getInetAddress().getHostName(); // if(toWriteMap.containsKey(clientName)){ // toWriteMap.remove(clientName); // resultMap.put(clientName,toWriteMap.get(clientName)); // } // } // } // return resultMap; // } void shutDown(){ if(running){ //start(); } } public void shutDownReally() throws IOException { logger.debug("shut down really"); running=false; selector.close(); channel.close(); } public void write(String name,Object o) throws IOException { if(selectionKeys.containsKey(name)){ addObjToMap(name,o,toWriteMap); }else{ addObjToMap(name,o,toWriteMap4Cha); } handleNotWritten(); // String clientName=channel.socket().getInetAddress().getHostName(); // List<Object> toWrites=map.get(clientName); // if(toWrites!=null){ // map.remove(clientName); // for(Object o:toWrites){ // writeContent(selectionKey,channel,o); // } // toWrites.clear(); // } // channel.register(selector, SelectionKey.OP_READ); } synchronized void handleNotWritten() { for(Map.Entry<String,Queue<Object>> toWrite:toWriteMap.entrySet()){ Queue<Object> list=toWrite.getValue(); if(list!=null && list.size()>0){ // SocketChannel socketChannel=channels.getChannel(toWrite.getKey()); SelectionKey key=selectionKeys.getSelectionKey(toWrite.getKey()); if(key==null){ logger.error(String.format("%s selectionKey is null",toWrite.getKey())); continue; } SocketChannel socketChannel=(SocketChannel)key.channel(); if(socketChannel!=null && socketChannel.isConnected()){ toWriteMap.remove(toWrite.getKey()); for(Object o1:list){ writeContent((ByteBuffer)key.attachment(),socketChannel,o1); } } // list.clear(); } } for(Map.Entry<String,Queue<Object>> toWrite:toWriteMap4Cha.entrySet()){ Queue<Object> list=toWrite.getValue(); if(list!=null && list.size()>0){ Collection<SocketChannel> socketChannels=socketChannelArr.get(toWrite.getKey()); if(socketChannels==null || socketChannels.size()==0){ logger.error(String.format("%s socketChannels is empty,%s",toWrite.getKey(),socketChannels)); continue;//此时如果remove就可以解决那种不断轮询这个主机的情况 } SelectionKey key=selectionKeys.getSelectionKey(toWrite.getKey()); if(key==null){ logger.error(String.format("%s selectionKey is null",toWrite.getKey())); continue; } toWriteMap4Cha.remove(toWrite.getKey());//先移除,保证不会在此时再被其他线程写入 for(SocketChannel socketChannel:socketChannels){ if(socketChannel!=null && socketChannel.isConnected()){ for(Object o1:list){ writeContent((ByteBuffer)key.attachment(),socketChannel,o1); } } } //list.clear(); } } } private void addObjToMap(String name,Object o,Map<String,Queue<Object>> map){ Queue<Object> toWrites=map.get(name); if(toWrites==null){ toWrites=new ArrayBlockingQueue<Object>(100); Queue<Object> queue=map.putIfAbsent(name,toWrites); if(queue!=null){ toWrites=queue; } } toWrites.add(o); } }

客户端类:

/** * 公共的客户端 * Created by guzy on 16/9/20. */ public class NioClient extends NioTemplate { Logger logger=Logger.getLogger(NioClient.class); //客户端channel private SocketChannel socketChannel; /** * 连接完成的处理器 */ CompleteHandler completeHandler; //用于写的内容列表 private Queue<Object> toWrites=new ArrayBlockingQueue<Object>(100); public NioClient setCompleteHandler(CompleteHandler completeHandler) { this.completeHandler = completeHandler; return this; } public NioClient(String host, int port, String name) throws IOException { super(name); socketChannel=SocketChannel.open(); socketChannel.configureBlocking(false); selector= Selector.open(); // socketChannel.socket().setTcpNoDelay(true); socketChannel.register(selector, SelectionKey.OP_CONNECT);//注册连接服务器兴趣 socketChannel.connect(new InetSocketAddress(host, port));//要连接的服务器 this.channel=socketChannel; } /** * 对selectionKey的处理 * @param selectionKey */ void handleKey(final SelectionKey selectionKey) throws IOException { final SocketChannel channel = (SocketChannel) selectionKey.channel(); if (selectionKey.isConnectable()) { if (channel.isConnectionPending()) { channel.finishConnect(); logger.debug(name + "完成连接!"); if(completeHandler!=null){ completeHandler.handle(channel); } } channel.register(selector, SelectionKey.OP_READ); return; } if (selectionKey.isReadable()) { handleReadable(selectionKey); } // if(selectionKey.isWritable() && !toWrites.isEmpty()){ // for(Object o:toWrites){ // writeContent(selectionKey,socketChannel,o); // } // toWrites.clear(); // channel.register(selector, SelectionKey.OP_READ); // } } public void write(Object o) throws IOException { toWrites.add(o); handleNotWritten(); } /** * 将所有需要写的数据一次性写掉 */ synchronized void handleNotWritten() { if(socketChannel.isConnected()){ for(Object toWrite:toWrites){ writeContent(null,socketChannel,toWrite); toWrites.remove(toWrite); } // toWrites.clear(); //channel.register(selector, SelectionKey.OP_READ); } } /** * 处理关闭事件 * @param selectionKey */ void handleClose(SelectionKey selectionKey){ logger.debug("server closed!"); shutDown(); } /** * 关闭selector 和 channel * @throws IOException */ void shutDown() { try { selector.close(); channel.close(); } catch (IOException e) { e.printStackTrace(); } } void handleFirstConnect(SelectionKey selectionKey,List<Object> results){ } }

服务端测试类:

public class ServerTest { public static void main(String[] args) throws IOException{ int port = 8889; final NioServer commonNIOServer=new NioServer(port,"server"); commonNIOServer.addContentHandler(new HalfContentHandler()); // new Thread(new Runnable() { // public void run() { // try { // // try { // Thread.sleep(7000l); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println("begin write "); // commonNIOServer.write("localhost","test_"); // commonNIOServer.write("localhost","test_1"); // // } catch (IOException e) { // e.printStackTrace(); // } // } // }).start(); new Thread(new Runnable() { public void run() { new ReadInput().read(new HandleStr() { public void handleStr(String str) throws Exception { try{ String strs[]=str.split(" "); commonNIOServer.write(strs[0],strs[1]); System.out.println(str); }catch (Exception e){ e.printStackTrace(); } } }); } }).start(); Runtime.getRuntime().addShutdownHook(new Thread(){ public void run(){ try { //System.out.println("test code"); commonNIOServer.shutDownReally(); } catch (Exception e) { e.printStackTrace(); } } }); commonNIOServer.start(); } }

客户端测试类:

public class ClientTest { public static void main(String[] args) throws IOException { final NioClient client=new NioClient("127.0.0.1",8889,"client"); client.setCompleteHandler(new CompleteHandler() { public void handle(SocketChannel socketChannel) throws IOException { //首次写数据 client.write("MyName:ss"); } })//连接成功处理器 .addContentHandler(new HalfContentHandler());//增加内容过滤器 new Thread(new Runnable() { public void run() { client.start();//启动进程 } }).start(); // try { // Thread.sleep(2000l); // } catch (InterruptedException e) { // e.printStackTrace(); // } new Thread(new Runnable() { public void run() { new ReadInput().read(new HandleStr() { public void handleStr(String str) throws Exception { client.write(str); //System.out.println(str); } }); } }).start(); // new Thread(new Runnable() { // public void run() { // try { // System.out.println("begin write"); // client.write("te"); // client.write("ted"); // client.write("testt"); // // } catch (IOException e) { // e.printStackTrace(); // } // } // }).start(); } }

运行效果如下: ClientTest:

2017-06-07 14:15:11.489 [com.netty.NioClient]-[DEBUG] client完成连接! 2017-06-07 14:15:11.495 [com.netty.NioTemplate$2]-[DEBUG] client发送:MyName:ss 2017-06-07 14:15:11.500 [com.netty.handlers.HalfContentHandler]-[DEBUG] before write,outs:0 2017-06-07 14:15:11.503 [com.netty.handlers.HalfContentHandler]-[DEBUG] after write,outs:1 2017-06-07 14:15:11.518 [com.netty.handlers.HalfContentHandler]-[DEBUG] before read,outs:0 2017-06-07 14:15:11.519 [com.netty.handlers.HalfContentHandler]-[DEBUG] after read,outs:1 2017-06-07 14:15:11.519 [com.netty.NioTemplate$1]-[DEBUG] client接收数据:服务端连接反馈! 2017-06-07 14:15:17.559 [com.netty.handlers.HalfContentHandler]-[DEBUG] before read,outs:0 2017-06-07 14:15:17.560 [com.netty.handlers.HalfContentHandler]-[DEBUG] after read,outs:1 2017-06-07 14:15:17.560 [com.netty.NioTemplate$1]-[DEBUG] client接收数据:dd 2017-06-07 14:15:20.784 [com.netty.handlers.HalfContentHandler]-[DEBUG] before read,outs:0 2017-06-07 14:15:20.784 [com.netty.handlers.HalfContentHandler]-[DEBUG] after read,outs:1 2017-06-07 14:15:20.784 [com.netty.NioTemplate$1]-[DEBUG] client接收数据:dfafda lll 2017-06-07 14:15:27.555 [com.netty.NioTemplate$2]-[DEBUG] client发送:lll 2017-06-07 14:15:27.556 [com.netty.handlers.HalfContentHandler]-[DEBUG] before write,outs:0 2017-06-07 14:15:27.556 [com.netty.handlers.HalfContentHandler]-[DEBUG] after write,outs:1

ServerTest:

2017-06-07 14:15:07.519 [com.netty.NioServer]-[DEBUG] serverName:server 2017-06-07 14:15:07.566 [com.netty.NioServer]-[DEBUG] server start --- 8889 2017-06-07 14:15:11.495 [com.netty.NioTemplate$2]-[DEBUG] server发送:服务端连接反馈! 2017-06-07 14:15:11.496 [com.netty.handlers.HalfContentHandler]-[DEBUG] before write,outs:0 2017-06-07 14:15:11.497 [com.netty.handlers.HalfContentHandler]-[DEBUG] after write,outs:1 2017-06-07 14:15:11.511 [com.netty.handlers.HalfContentHandler]-[DEBUG] before read,outs:0 2017-06-07 14:15:11.512 [com.netty.handlers.HalfContentHandler]-[DEBUG] after read,outs:1 2017-06-07 14:15:11.512 [com.netty.NioServer]-[DEBUG] enter handleFirst 2017-06-07 14:15:11.512 [com.netty.NioServer]-[DEBUG] client name:ss 2017-06-07 14:15:11.513 [com.netty.NioTemplate$1]-[DEBUG] server接收数据:MyName:ss ss dd ss dd 2017-06-07 14:15:17.554 [com.netty.NioTemplate$2]-[DEBUG] server发送:dd 2017-06-07 14:15:17.554 [com.netty.handlers.HalfContentHandler]-[DEBUG] before write,outs:0 2017-06-07 14:15:17.554 [com.netty.handlers.HalfContentHandler]-[DEBUG] after write,outs:1 localhost dfafda localhost dfafda 2017-06-07 14:15:20.779 [com.netty.NioTemplate$2]-[DEBUG] server发送:dfafda 2017-06-07 14:15:20.780 [com.netty.handlers.HalfContentHandler]-[DEBUG] before write,outs:0 2017-06-07 14:15:20.780 [com.netty.handlers.HalfContentHandler]-[DEBUG] after write,outs:1 2017-06-07 14:15:27.560 [com.netty.handlers.HalfContentHandler]-[DEBUG] before read,outs:0 2017-06-07 14:15:27.560 [com.netty.handlers.HalfContentHandler]-[DEBUG] after read,outs:1 2017-06-07 14:15:27.560 [com.netty.NioServer]-[DEBUG] enter handleFirst 2017-06-07 14:15:27.560 [com.netty.NioServer]-[DEBUG] already has selectionKey 2017-06-07 14:15:27.561 [com.netty.NioTemplate$1]-[DEBUG] server接收数据:lll
转载请注明原文地址: https://www.6miu.com/read-42867.html

最新回复(0)