JavaNIO--2.实现ECHO服务器

xiaoxiao2021-02-28  37

JavaNIO技术实现ECHO服务器 1.NIO核心组件的使用 1.1初始化NIO组件1.2Accept组件1.3SelectLoop(核心组件) 2.NIO通道读写(Buffer) 2.1读取通道内容2.2Buffer处理辅助方法 3.测试结果4.完整代码

JavaNIO技术实现ECHO服务器

所谓ECHO服务器就是客户端发送到服务器端什么内容,服务器端就返回什么内容的一种服务器,者几乎是最简单的网络服务器(当然还有更简单的抛弃服务器)

阅读需要基础:JavaNIO基础

1.NIO核心组件的使用

NIO核心组件主要包括Selector和Channel,而Buffer主要用于和Channel进行数据交互,所以不在此作详细的使用介绍。

1.1初始化NIO组件

public class NioServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private int port; public NioServer(int port) throws IOException { // 打开一个ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 设置为非阻塞模式才能注册到Selector serverSocketChannel.configureBlocking(false); // 打开一个选择器 selector = Selector.open(); this.port = port; } // 启动服务器的方法 private void startServer() { try { serverSocketChannel.bind(new InetSocketAddress(port)); // 注册该通道到选择器,注意兴趣操作是SelectionKey.OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectLoop(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

需要了解的是Channel需要设置为非阻塞模式才能注册到选择器

Channel调用register()方法时需要指定兴趣操作,意思就是该选择器会监听这个通道有没有准备好可以执行的操作,兴趣操作有:SelectionKey.OP_ACCEPT,SelectionKey.OP_READ,SelectionKey.OP_WRITE,SelectionKey.OP_CONNECT,分别对应的是ServerSocketChannel的accept()方法可以执行(不需阻塞),SocketChannel的read()/write()方法可以执行(不需阻塞),以及SocketChannel内含的Socket的connect()方法可以调用(不需阻塞)。

如果不太了解NIO对应的操作模型,可以去参考我的上一篇博客:IO多路复用和NIO

1.2Accept组件

private void acceptClient(SelectionKey selectionKey) throws IOException { // 与对端Socket建立连接 SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress()); } // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); }

使用和传统ServerSocket的accept()方法流程一致,需要注意的是,传统的accept()调用时会阻塞直到建立一个TCP连接,而使用Selector选择器可以避免阻塞,确保调用该方法时一定有一个(或多个)Socket连接已经在等待建立。

1.3SelectLoop(核心组件)

可以看到一个java.nio.channels.Selector可以注册多个通道,Selector可以监听注册到自身的通道的状态。

private void selectLoop() throws IOException { while(true) { // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件准备完毕 selector.select(); // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 循环判断其中的key if (selectionKey.isAcceptable()) { // 如果key处于可接受状态,就进入接收函数 acceptClient(selectionKey); }else if(selectionKey.isReadable()) { // 如果key处于可读状态,就进入读函数 readDate(selectionKey); } } // 每次处理完通道事件以后,要进行一次清空 selectionKeys.clear(); } }

可以看到,通过调用选择器的select()会不断的得到将要发生事件通道,只要是注册到该选择器的通道,都会被轮询一次,而我们通过while循环,可以做到单线程无阻塞I/O。

2.NIO通道读写(Buffer)

2.1读取通道内容

private void readDate(SelectionKey selectionKey) throws IOException { // 每一次都先获取之前绑定在这个key上的buffer ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer newBuffer = ByteBuffer.allocate(64); int read; while((read = socketChannel.read(newBuffer))<=0) { return; } newBuffer.flip(); // 读取Buffer,看是否有换行符 String line = readLine(newBuffer); if (line != null) { // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行 String sendData = readLine(mergeBuffer(oldBuffer, newBuffer)); if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接 socketChannel.close(); return; } // 然后直接发送回到客户端 ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8")); while (sendBuffer.hasRemaining()) { socketChannel.write(sendBuffer); } selectionKey.attach(null); }else { // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上 selectionKey.attach(mergeBuffer(oldBuffer, newBuffer)); } }

2.2Buffer处理辅助方法

/** * 读取ByteBuffer直到一行的末尾 * 返回这一行的内容,包括换行符 * * @param buffer * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符 * @throws UnsupportedEncodingException */ private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException { // windows中的换行符表示手段 "\r\n" // 基于windows的软件发送的换行符是会是CR和LF char CR = '\r'; char LF = '\n'; boolean crFound = false; int index = 0; int len = buffer.limit(); buffer.rewind(); while(index < len) { byte temp = buffer.get(); if (temp == CR) { crFound = true; } if (crFound && temp == LF) { // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组 return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8"); } index ++; } return null; } /** * 获取一行的内容,不包括换行符 * @param buffer * @return String 行的内容 * @throws UnsupportedEncodingException */ private String readLineContent(String line) throws UnsupportedEncodingException { return line.substring(0, line.length() - 2); } /** * 对传入的Buffer进行拼接 * @param oldBuffer * @param newBuffer * @return ByteBuffer 拼接后的Buffer */ public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) { // 如果原来的Buffer是null就直接返回 if (oldBuffer == null) { return newBuffer; } // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接 newBuffer.rewind(); if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) { return oldBuffer.put(newBuffer); } // 如果不是以上两种情况就构建新的Buffer进行拼接 int oldSize = oldBuffer != null?oldBuffer.limit():0; int newSize = newBuffer != null?newBuffer.limit():0; ByteBuffer result = ByteBuffer.allocate(oldSize+newSize); result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize)); result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize)); return result; }

这些代码是为了实现ECHO返回而实现的辅助方法,主要是进行Buffer的处理。

3.测试结果

使用telnet进行连接测试,实现了ECHO服务器的功能,而且输入exit会关闭该连接。

4.完整代码

import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.Iterator; import java.util.Set; public class NioServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private int port; public NioServer(int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); selector = Selector.open(); this.port = port; } private void selectLoop() throws IOException { while(true) { // select()方法会阻塞,直到有注册到该选择器的通道有兴趣事件发生 selector.select(); // selectedKeys()方法会获得有兴趣事件发生的通道,注册得到的SelectionKey的集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 循环判断其中的key if (selectionKey.isAcceptable()) { // 如果key处于可接受状态,就进入接收函数 acceptClient(selectionKey); }else if(selectionKey.isReadable()) { // 如果key处于可读状态,就进入读函数 readDate(selectionKey); } } selectionKeys.clear(); } } /** * 接收连接并将建立的通道注册到选择器 * * @param selectionKey * @throws IOException */ private void acceptClient(SelectionKey selectionKey) throws IOException { // 与对端Socket建立连接 SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { System.err.println("接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress()); } // 将接收到的SocketChannel注册到Selector,注意此时通道要设置为非阻塞模式,且兴趣操作为OP_READ socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } private void readDate(SelectionKey selectionKey) throws IOException { // 每一次都先获取之前绑定在这个key上的buffer ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer newBuffer = ByteBuffer.allocate(64); int read; while((read = socketChannel.read(newBuffer))<=0) { return; } newBuffer.flip(); String line = readLine(newBuffer); if (line != null) { // 如果这次读到了行结束符,就将原来不含有行结束符的buffer合并位一行 String sendData = readLine(mergeBuffer(oldBuffer, newBuffer)); if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果这一行的内容是exit就断开连接 socketChannel.close(); return; } // 然后直接发送回到客户端 ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8")); while (sendBuffer.hasRemaining()) { socketChannel.write(sendBuffer); } selectionKey.attach(null); }else { // 如果这次没读到行结束付,就将这次读的内容和原来的内容合并,并刷新绑定到key对象上 selectionKey.attach(mergeBuffer(oldBuffer, newBuffer)); } } /** * 读取ByteBuffer直到一行的末尾 * 返回这一行的内容,包括换行符 * * @param buffer * @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符 * @throws UnsupportedEncodingException */ private String readLine(ByteBuffer buffer) throws UnsupportedEncodingException { // windows中的换行符表示手段 "\r\n" // 基于windows的软件发送的换行符是会是CR和LF char CR = '\r'; char LF = '\n'; boolean crFound = false; int index = 0; int len = buffer.limit(); buffer.rewind(); while(index < len) { byte temp = buffer.get(); if (temp == CR) { crFound = true; } if (crFound && temp == LF) { // Arrays.copyOf(srcArr,length)方法会返回一个 源数组中的长度到length位 的新数组 return new String(Arrays.copyOf(buffer.array(), index+1),"utf-8"); } index ++; } return null; } /** * 获取一行的内容,不包括换行符 * @param buffer * @return String 行的内容 * @throws UnsupportedEncodingException */ private String readLineContent(String line) throws UnsupportedEncodingException { return line.substring(0, line.length() - 2); } /** * 对传入的Buffer进行拼接 * @param oldBuffer * @param newBuffer * @return ByteBuffer 拼接后的Buffer */ public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) { // 如果原来的Buffer是null就直接返回 if (oldBuffer == null) { return newBuffer; } // 如果原来的Buffer的剩余长度可容纳新的buffer则直接拼接 newBuffer.rewind(); if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) { return oldBuffer.put(newBuffer); } // 如果不是以上两种情况就构建新的Buffer进行拼接 int oldSize = oldBuffer != null?oldBuffer.limit():0; int newSize = newBuffer != null?newBuffer.limit():0; ByteBuffer result = ByteBuffer.allocate(oldSize+newSize); result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize)); result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize)); return result; } private void startServer() { try { serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectLoop(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws UnsupportedEncodingException { try { new NioServer(12345).startServer(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
转载请注明原文地址: https://www.6miu.com/read-2623844.html

最新回复(0)