在前面的文章中我们讲过Socket的读是阻塞模式,并且读写都是一个字节或者几个字节,为了提高响应性能往往需要在服务端为每一个请求分配一个线程。后来为了解决传统Socket的这种阻塞低效方式,在jdk1.4之后引入了New I/o(NIO)。
NIO网络编程中比较重要的概念是缓冲区、通道、选择器,NIO它是基于缓存区的操作,一次能读写一个或者多个数据块,通道可以以阻塞(blocking)或非阻塞(nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。在NIO通道的读写两端能直接操作的就是ByteBuffer,缓冲区是可以在不同通道或者同一个通道的读写端共用的。与缓冲区不同,通道不能被重复使用,一个打开的通道即代表与一个特定I/O服务的特定连接并封装该连接的状态。当通道关闭时,那个连接会丢失,然后通道将不再连接任何东西。
Socket和SocketChannel类封装点对点、有序的网络连接,SocketChannel扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收,每个SocketChannel对象创建时都是同一个对等的java.net.Socket对象串联的。 而在新创建的SocketChannel上调用socket( )方法能返回它对等的Socket对象;在该Socket上调用getChannel( )方法则能返回最初的那个SocketChannel。 如果选择使用通过在对等Socket对象上调用connect( )方法与服务端建立连接,那么线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么使用传统Socket连接过程实际上是一样的。在SocketChannel上并没有一种connect( )方法可以让您指定超时(timeout)值,当connect( )方法在非阻塞模式下被调用时SocketChannel提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立,connect( )方法会返回false且并发地继续连接建立过程。
而在服务端ServerSocketChannel扮演者服务端通道的角色,它负责监听服务器上的一个连接,在创建服务端通道的时候需要调用对等的ServerSocket对象绑定到指定的端口上。在传统的基于流的Socket网络编程中,服务端为每一个请求创建一个线程用于读写数据,而使用ServerSocketChannel在服务端编程,我们往往配合Selector选择器使用,在服务端我们将特定的Accpet、Read、Write事件注册到选择器上,由选择器帮我检查操作系统内核是否可读和可写,如有对应的事件满足要求,选择器会通知调用者线程。
相对于传统Socket编程,使用基于缓冲区的NIO编程在如下几点不会阻塞调用者线程:
(1)客户端connect( )方法不会阻塞
(2)服务端accept()方法不会阻塞
(3)Socket的读read()方法不会阻塞
下面我展示一个常用的NIO网络编程的客户端和服务端的代码示例,代码是经过自己测试可用的,其中我添加了大量的注释用于说明代码的意图:
/** * @author yujie.wang * SocketChannel 客户端代码测试 */ public class SocketChannel_Client { private final static String DEFAULT_HOST = "127.0.0.1"; private final static int DEFAULT_PORT = 4567; private SocketChannel channel; private Socket socket; //分配一个大小为50字节的缓冲区 用于客户端通道的读写 private ByteBuffer buffer = ByteBuffer.allocate(50); public SocketChannel_Client(){ this(DEFAULT_HOST, DEFAULT_PORT); } public SocketChannel_Client(String host, int port){ init(host,port); } /** * 打开通道并设置对等的客户端socket对象 * 建立与服务端通道的连接 * @param host * @param port */ public void init(String host, int port){ try { //打开一个客户端通道,同时当前通道并没有与服务端通道建立连接 channel = SocketChannel.open(); //获得对等的客户端socket socket = channel.socket(); //配置客户端socket setSocket(); //将通道设置为非阻塞工作方式 channel.configureBlocking(false); //异步连接,发起连接之后就立即返回 //返回true,连接已经建立 //返回false,后续继续建立连接 channel.connect(new InetSocketAddress(host,port)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 验证连接是否建立 */ public void finishConnect(){ try { while(!channel.finishConnect()){ // nothing to do,wait connect } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 验证当前连接是否可用 */ public void isConnected(){ try { if(channel == null || !channel.isConnected()) throw new IOException("channel is broken"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 配置客户端通道对等的Socket */ public void setSocket(){ try { if(socket != null ){ //设置socket 读取的超时时间5秒 //socket.setSoTimeout(5000); //设置小数据包不再组合成大包发送,也不再等待前一个数据包返回确认消息 socket.setTcpNoDelay(true); //设置如果客户端Socket关闭了,未发送的包直接丢弃 socket.setSoLinger(true, 0); } } catch (Exception e) { // TODO: handle exception } } public void write(String data) { buffer.clear(); buffer.put(data.getBytes()); buffer.flip(); try { // write并不一定能一次将buffer中的数据都写入 所以这里要多次写入 // 当多个线程同时调用同一个通道的写方法时,只有一个线程能工作,其他现在则会阻塞 while(buffer.hasRemaining()){ channel.write(buffer); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void read(){ try { buffer.clear(); // read方法并不阻塞,如果有数据读入返回读入的字节数,没有数据读入返回0 ,遇到流的末尾返回-1 // 当然这里和Socket和ServerSocket通信一样 也会存在消息无边界的问题 我们这里就采取简单的读取一次作为示例 System.out.println("read begin"); channel.read(buffer); /* while(buffer.hasRemaining() && channel.read(buffer) != -1){ printBuffer(buffer); }*/ buffer.flip(); printBuffer(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 输出buffer中的数据 * @param buffer */ public void printBuffer(ByteBuffer buffer){ while(buffer.hasRemaining()){ System.out.print((char)buffer.get()); } System.out.println(""); System.out.println("****** Read end ******"); } /** * 判断通道是否打开 * @return */ public boolean isChannelOpen(){ try { return channel.finishConnect() ? true : false; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } /** * 关闭通道 */ public void closeChannel(){ if(channel != null){ try { channel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) { // TODO Auto-generated method stub // client(DEFAULT_HOST,DEFAULT_PORT); SocketChannel_Client client = new SocketChannel_Client(); client.finishConnect(); System.out.println("connect success"); client.write("Hello World"); System.out.println("client write end"); client.read(); sleep(15000); System.out.println("client exit"); } public static void sleep(long time){ try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
服务端代码示例:
/** * @author yujie.wang * ServerSocketChannel 测试用例 */ public class ServerSocketChannel_Server { private final static int DEFAULT_PORT = 4567; private ServerSocketChannel channel; private Selector selector; private ServerSocket serverSocket; public ServerSocketChannel_Server(){ this(DEFAULT_PORT); } public ServerSocketChannel_Server(int port){ init(port); } public static void main(String[] args) { // TODO Auto-generated method stub ServerSocketChannel_Server server = new ServerSocketChannel_Server(); server.selector(); System.out.println("server exit"); } public void init(int port){ try { //打开一个服务端通道 channel = ServerSocketChannel.open(); //获得对等的ServerSocket对象 serverSocket = channel.socket(); //将服务端ServerSocket绑定到指定端口 serverSocket.bind(new InetSocketAddress(port)); System.out.println("Server listening on port: "+ port); //将通道设置为非阻塞模式 channel.configureBlocking(false); //打开一个选择器 selector = Selector.open(); //将通道注册到打开的选择器上 channel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void selector(){ try { while(true){ System.out.println("begin to select"); //select()方法会阻塞,直到有准备就绪的通道有准备好的操作;或者当前线程中断该方法也会返回 //这里的返回值不是选择器中已选择键集合中键的数量,而是从上一次select()方法调用到这次调用期间进入就绪状态通道的数量 int readyKeyCount = selector.select(); if(readyKeyCount <= 0){ continue; } System.out.println("ok select readyCount: "+ readyKeyCount); //获得已选择键的集合这个集合中包含了 新准备就绪的通道和上次调用select()方法已经存在的就绪通道 Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); //通过调用remove将这个键key从已选择键的集合中删除 iterator.remove(); if(key.isAcceptable()){ handleAccept(key); }else if(key.isReadable()){ handleRead(key); }else if(key.isWritable()){ handleWrite(key,"Hello World"); } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 处理客户端连接事件 * @param key * @param selector */ public void handleAccept(SelectionKey key){ try { //因为能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道, //所以这里可以直接转换成ServerSocketChannel ServerSocketChannel channel = (ServerSocketChannel)key.channel(); //获得客户端的SocketChannel对象 ,accept这里不会阻塞,如果没有连接到来,这里会返回null SocketChannel client = channel.accept(); System.out.println("Accepted Connected from: "+ client); //将客户端socketChannel设置为非阻塞模式 client.configureBlocking(false); //为该客户端socket分配一个ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(50); client.register(selector, SelectionKey.OP_READ, buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 处理读取数据事件 * @param key */ public void handleRead(SelectionKey key){ try { //从键中获得相应的客户端socketChannel SocketChannel channel = (SocketChannel)key.channel(); //获得与客户端socketChannel关联的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); buffer.clear(); //将数据读取到buffer中,这里read方法不会阻塞 //有数据返回读取的字节数,没有数据返回0,遇到流的末尾则返回-1 //这里为了避免消息的无边界 性 ,所以只读取一次数据 int count = channel.read(buffer); System.out.println("read count:"+ count); buffer.flip(); //输出数据 printBuffer(buffer); buffer.clear(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 处理写入数据的时间 * @param key * @param data */ public void handleWrite(SelectionKey key,String data){ try { SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = (ByteBuffer)key.attachment(); buffer.clear(); buffer.put(data.getBytes()); buffer.flip(); while(buffer.hasRemaining()){ channel.write(buffer); } buffer.clear(); } catch (Exception e) { // TODO: handle exception } } public static void printBuffer(ByteBuffer buffer){ while(buffer.hasRemaining()){ System.out.println("positon: " + buffer.position()+ " limit:"+ buffer.limit()); System.out.print((char)buffer.get()); } System.out.println(""); System.out.println("****** Read end ******"); System.out.println(""); } }