JavaNIO技术实现ECHO服务器
1.NIO核心组件的使用
1.1初始化NIO组件1.2Accept组件1.3SelectLoop(核心组件) 2.NIO通道读写(Buffer)
2.1读取通道内容2.2Buffer处理辅助方法 3.测试结果4.完整代码
JavaNIO技术实现ECHO服务器
所谓ECHO服务器就是客户端发送到服务器端什么内容,服务器端就返回什么内容的一种服务器,者几乎是最简单的网络服务器(当然还有更简单的抛弃服务器)
阅读需要基础:JavaNIO基础
1.NIO核心组件的使用
NIO核心组件主要包括Selector和Channel,而Buffer主要用于和Channel进行数据交互,所以不在此作详细的使用介绍。
1.1初始化NIO组件
public class NioServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int port;
public NioServer(
int port) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(
false);
selector = Selector.open();
this.port = port;
}
private void startServer() {
try {
serverSocketChannel.bind(
new InetSocketAddress(port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectLoop();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
需要了解的是Channel需要设置为非阻塞模式才能注册到选择器
Channel调用register()方法时需要指定兴趣操作,意思就是该选择器会监听这个通道有没有准备好可以执行的操作,兴趣操作有:SelectionKey.OP_ACCEPT,SelectionKey.OP_READ,SelectionKey.OP_WRITE,SelectionKey.OP_CONNECT,分别对应的是ServerSocketChannel的accept()方法可以执行(不需阻塞),SocketChannel的read()/write()方法可以执行(不需阻塞),以及SocketChannel内含的Socket的connect()方法可以调用(不需阻塞)。
如果不太了解NIO对应的操作模型,可以去参考我的上一篇博客:IO多路复用和NIO
1.2Accept组件
private void acceptClient(SelectionKey selectionKey)
throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel !=
null) {
System.err.println(
"接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
}
socketChannel.configureBlocking(
false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
使用和传统ServerSocket的accept()方法流程一致,需要注意的是,传统的accept()调用时会阻塞直到建立一个TCP连接,而使用Selector选择器可以避免阻塞,确保调用该方法时一定有一个(或多个)Socket连接已经在等待建立。
1.3SelectLoop(核心组件)
可以看到一个java.nio.channels.Selector可以注册多个通道,Selector可以监听注册到自身的通道的状态。
private void selectLoop() throws IOException {
while(
true) {
selector.
select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
acceptClient(selectionKey);
}
else if(selectionKey.isReadable()) {
readDate(selectionKey);
}
}
selectionKeys.clear();
}
}
可以看到,通过调用选择器的select()会不断的得到将要发生事件通道,只要是注册到该选择器的通道,都会被轮询一次,而我们通过while循环,可以做到单线程无阻塞I/O。
2.NIO通道读写(Buffer)
2.1读取通道内容
private void readDate(SelectionKey selectionKey)
throws IOException {
ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer newBuffer = ByteBuffer.allocate(
64);
int read;
while((read = socketChannel.read(newBuffer))<=
0) {
return;
}
newBuffer.flip();
String line = readLine(newBuffer);
if (line !=
null) {
String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
if (readLineContent(sendData).equalsIgnoreCase(
"exit")) {
socketChannel.close();
return;
}
ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes(
"utf-8"));
while (sendBuffer.hasRemaining()) {
socketChannel.write(sendBuffer);
}
selectionKey.attach(
null);
}
else {
selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
}
}
2.2Buffer处理辅助方法
/**
* 读取ByteBuffer直到一行的末尾
* 返回这一行的内容,包括换行符
*
* @param buffer
* @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
* @throws UnsupportedEncodingException
*/
private String
readLine(ByteBuffer buffer)
throws UnsupportedEncodingException {
char CR =
'\r';
char LF =
'\n';
boolean crFound =
false;
int index =
0;
int len = buffer.limit();
buffer.rewind();
while(index < len) {
byte temp = buffer.get();
if (temp == CR) {
crFound =
true;
}
if (crFound && temp == LF) {
return new String(Arrays.copyOf(buffer.array(), index+
1),
"utf-8");
}
index ++;
}
return null;
}
/**
* 获取一行的内容,不包括换行符
* @param buffer
* @return String 行的内容
* @throws UnsupportedEncodingException
*/
private String
readLineContent(String line)
throws UnsupportedEncodingException {
return line.substring(
0, line.length() -
2);
}
/**
* 对传入的Buffer进行拼接
* @param oldBuffer
* @param newBuffer
* @return ByteBuffer 拼接后的Buffer
*/
public static ByteBuffer
mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
if (oldBuffer ==
null) {
return newBuffer;
}
newBuffer.rewind();
if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
return oldBuffer.put(newBuffer);
}
int oldSize = oldBuffer !=
null?oldBuffer.limit():
0;
int newSize = newBuffer !=
null?newBuffer.limit():
0;
ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);
result.put(Arrays.copyOfRange(oldBuffer.array(),
0, oldSize));
result.put(Arrays.copyOfRange(newBuffer.array(),
0, newSize));
return result;
}
这些代码是为了实现ECHO返回而实现的辅助方法,主要是进行Buffer的处理。
3.测试结果
使用telnet进行连接测试,实现了ECHO服务器的功能,而且输入exit会关闭该连接。
4.完整代码
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int port;
public NioServer(
int port)
throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(
false);
selector = Selector.open();
this.port = port;
}
private void selectLoop()
throws IOException {
while(
true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
acceptClient(selectionKey);
}
else if(selectionKey.isReadable()) {
readDate(selectionKey);
}
}
selectionKeys.clear();
}
}
/**
* 接收连接并将建立的通道注册到选择器
*
* @param selectionKey
* @throws IOException
*/
private void acceptClient(SelectionKey selectionKey)
throws IOException {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel !=
null) {
System.err.println(
"接收到一个连接,对端IP为:"+socketChannel.socket().getInetAddress());
}
socketChannel.configureBlocking(
false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void readDate(SelectionKey selectionKey)
throws IOException {
ByteBuffer oldBuffer = (ByteBuffer)selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer newBuffer = ByteBuffer.allocate(
64);
int read;
while((read = socketChannel.read(newBuffer))<=
0) {
return;
}
newBuffer.flip();
String line = readLine(newBuffer);
if (line !=
null) {
String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
if (readLineContent(sendData).equalsIgnoreCase(
"exit")) {
socketChannel.close();
return;
}
ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes(
"utf-8"));
while (sendBuffer.hasRemaining()) {
socketChannel.write(sendBuffer);
}
selectionKey.attach(
null);
}
else {
selectionKey.attach(mergeBuffer(oldBuffer, newBuffer));
}
}
/**
* 读取ByteBuffer直到一行的末尾
* 返回这一行的内容,包括换行符
*
* @param buffer
* @return String 读取到行末的内容,包括换行符 ; null 如果没有换行符
* @throws UnsupportedEncodingException
*/
private String
readLine(ByteBuffer buffer)
throws UnsupportedEncodingException {
char CR =
'\r';
char LF =
'\n';
boolean crFound =
false;
int index =
0;
int len = buffer.limit();
buffer.rewind();
while(index < len) {
byte temp = buffer.get();
if (temp == CR) {
crFound =
true;
}
if (crFound && temp == LF) {
return new String(Arrays.copyOf(buffer.array(), index+
1),
"utf-8");
}
index ++;
}
return null;
}
/**
* 获取一行的内容,不包括换行符
* @param buffer
* @return String 行的内容
* @throws UnsupportedEncodingException
*/
private String
readLineContent(String line)
throws UnsupportedEncodingException {
return line.substring(
0, line.length() -
2);
}
/**
* 对传入的Buffer进行拼接
* @param oldBuffer
* @param newBuffer
* @return ByteBuffer 拼接后的Buffer
*/
public static ByteBuffer
mergeBuffer(ByteBuffer oldBuffer,ByteBuffer newBuffer) {
if (oldBuffer ==
null) {
return newBuffer;
}
newBuffer.rewind();
if (oldBuffer.remaining() > (newBuffer.limit()-newBuffer.position())) {
return oldBuffer.put(newBuffer);
}
int oldSize = oldBuffer !=
null?oldBuffer.limit():
0;
int newSize = newBuffer !=
null?newBuffer.limit():
0;
ByteBuffer result = ByteBuffer.allocate(oldSize+newSize);
result.put(Arrays.copyOfRange(oldBuffer.array(),
0, oldSize));
result.put(Arrays.copyOfRange(newBuffer.array(),
0, newSize));
return result;
}
private void startServer() {
try {
serverSocketChannel.bind(
new InetSocketAddress(port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectLoop();
}
catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
throws UnsupportedEncodingException {
try {
new NioServer(
12345).startServer();
}
catch (IOException e) {
e.printStackTrace();
}
}
}