NIO编程 TimeServer && TimeClient

xiaoxiao2021-02-28  86

本程序是《Netty权威指南》中 NIO编程中的一个小程序分为 Server 和Client 两部分

1,TimeServer的启动程序

/** * Created by ju on 2017-06-06. */ // nio 时间服务器 public class TimeServer { public static void main(String[] args) { int port = 9999; if(args !=null && args.length > 0){ try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }

2,时间服务NIO 的具体实现MultiplexerTimeServer

/** * Created by ju on 2017-06-06. */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port)); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Time Server is start in port "+ port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop = true; } @Override public void run() { while(!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } if(selector != null ){ try { System.out.println("selector not null"); selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ // 判断是否是个有效的句柄 if(key.isAcceptable()){ //接受请求 ServerSocketChannel ssc =(ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //添加新连接到 selector sc.register(selector,SelectionKey.OP_READ); } if (key.isReadable()){ //读取数据 SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024);//为ByteBuffer分配空间大小(位于 jvm) // ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);//为ByteBuffer分配空间大小(基于操作系统) int readBytes = sc.read(readBuffer); if(readBytes > 0){ readBuffer.flip();//将缓存字节数组的指针设置为数组的开始序列即数组下标0 byte[] bytes = new byte[readBuffer.remaining()];//返回剩余的可用长度,此长度为实际读取的数据长度,最大自然是底层数组的长度 readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order : "+body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString(): "BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ //对端链路关闭 key.cancel(); sc.close(); }else{ //读取到 0 字节,忽略 } } } } private void doWrite(SocketChannel channel,String response) throws IOException { if(response !=null && response.trim().length() >0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip();//将position 设置为0 channel.write(writeBuffer); writeBuffer.clear(); } } }

3,TimeClient 客户端启动程序

/** * Created by ju on 2017-06-07. */ public class TimeClient { public static void main(String[] args) { int port = 9999; if(args!=null && args.length>0){ port = Integer.valueOf(args[0]); } new Thread(new TimeClientHandler("127.0.0.1", port)).start(); } }

4,客户端NIO的具体实现程序

/** * Created by ju on 2017-06-07. */ public class TimeClientHandler implements Runnable { private int port; private String host; private Selector selector; private SocketChannel channel; private volatile boolean stop; public TimeClientHandler(String host,int port){ this.host = host ==null?"127.0.0.1":host; this.port = port; try { selector = Selector.open(); channel = SocketChannel.open(); channel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { try { doConnect(); } catch (IOException e1) { e1.printStackTrace(); System.exit(1); } while(!stop){ try { //selector每一秒被唤醒一次 selector.select(1000); //还回就绪状态的chanel的selectedKeys Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); SelectionKey key = null; while(iterator.hasNext()){ key = iterator.next(); iterator.remove(); try{ handleInput(key); }catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); } } if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } selector =null; } } public void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else{ System.exit(1);// 连接失败,进程退出 } } if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doConnect() throws IOException{ if(channel.connect(new InetSocketAddress(host, port))){ channel.register(selector, SelectionKey.OP_READ); doWrite(channel); }else{ channel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel schannel) throws IOException { byte[] bytes = "QUERY TIME ORDER".getBytes(); ByteBuffer buff = ByteBuffer.allocate(bytes.length); buff.put(bytes); buff.flip(); schannel.write(buff); //判断是否发送完毕 if(!buff.hasRemaining()){ System.out.println("SEND SUCCESS!"); } }
转载请注明原文地址: https://www.6miu.com/read-55349.html

最新回复(0)