使用javaNIO实现CS模式的通信

xiaoxiao2021-02-28  98

NIO使用非阻塞IO的方式实现,服务器与客户端的交流,适用于大量连接,而数据量少的情况。通过一个线程轮询所有的通道,处理注册的事件,而主线程可以继续干其他的事情。这样所有的I/O都交给一个线程处理,减少了线程IO的切换。如果具体学习NIO的架构和原理请点击下面的连接

点击打开链接 http://ifeve.com/selectors/

以下为一个使用NIO实现的C/S通信模式,对应简单例子学习,可以更容易入手。

服务端代码如下:

主线程类:MainChannel

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; public class MainChannel { public static void main(String[] args) { // TODO Auto-generated method stub // 声明服务器监听通道 和select选择器 ServerSocketChannel serverChannel = null; Selector selector = null; try { // 实例化selector 此种实例化模式说明 selector 是单例的 selector = Selector.open(); // 实例化服务器监听端口 serverChannel = ServerSocketChannel.open(); // 绑定监听地址。 serverChannel.socket().bind(new InetSocketAddress(8881)); // 设置channel为非阻塞模式,一定要非阻塞模式才能注册到selector中 serverChannel.configureBlocking(false); // 把监听通道注册到选择器中, 监听此通道的连接事件。SelectionKey.OP_ACCEPT 指定为连接事件。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //所有通道,交给通道管理器,统一管理 ChannelManager.addChannel(serverChannel); // 开启一个线程负责管理selector,并轮询是否有注册监听的事件就绪。 Thread thread = new Thread(new ServerNIO(selector)); thread.start(); // 然后主线程 就可以干其他的事情了。不管客户端连接 还是I/O // 都不会阻塞此线程,只会阻塞selector管理线程,且只在等待事件发生时阻塞。 } catch (IOException e) { // TODO Auto-generated catch block System.out.println("服务器监听发生异常"); } } } 管理selector的线程类:

import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class ServerNIO implements Runnable { // 客户端通道,用于给某个客户端收发数据 private SocketChannel socketChannel; // 缓冲区,用户收发数据,面向通道。 private ByteBuffer buf = ByteBuffer.allocate(1024); // 选择器,从主线程注入 private Selector selector; // 指定线程是否轮询 private boolean flag = true; // 构造器中注入selector public ServerNIO(Selector selector) { // TODO Auto-generated constructor stub this.selector = selector; } // 开启线程等待事件的发生,轮询通道。 @Override public void run() { // TODO Auto-generated method stub try { while (flag) { /** * 获取等待的事件就绪的通道,如果没有通道有事件就绪有三种情况 * 1. 直接返回:selectNow(); * 2. 超时返回:select(int timeout); * 3. 阻 塞: select(); */ int nums = selector.select(); // 阻塞模式 if (nums > 0) { // 阻塞模式下 此次判定多余 // 当事件发生了,获取发生事件通道的key集合; Set<SelectionKey> selectKeys = selector.selectedKeys(); // 迭代遍历这个keys集合 Iterator<SelectionKey> iter = selectKeys.iterator(); while (iter.hasNext()) { // 获取单个通道的key SelectionKey key = iter.next(); // 如果是读取事件就绪。说明有客户端向服务器发送数据了。 if (key.isReadable()) { // 先获取到客户端channel socketChannel = (SocketChannel) key.channel(); buf.clear(); // 利用buffer读取数据。 int len = socketChannel.read(buf); if (len > 0) { byte[] str = new byte[len]; buf.rewind(); buf.get(str, 0, len); buf.clear(); System.out.println("获取客户端数据:" + new String(str)); // 给客户端回复数据 String temp = "服务器回复: 已经收到您发送的数据,祝您一路平安!"; buf.clear(); buf.put(temp.getBytes()); buf.flip(); socketChannel.write(buf); System.out.println("已经向客户端回复"); //此处可以利用ChannelManager向其他所有通道广播数据。只要在ChannelManager中写一个广播方法即可 } } else if (key.isAcceptable()) { // 如果是接受客户端连接就绪 // 从key中获取对应的通道 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); // 接受这个连接。 SocketChannel socketChannel = serverChannel.accept(); // 如果连接不为空,则控制台打印 连接的客户端地址。 if (socketChannel != null) { // 由于关闭selector的时候,并不会关闭通道,最好使用一个容器,将通道都保存起来 //然后开启心跳连接,如果通道出现异常则关闭此通道,当应用程序关闭的时候,关闭所有的通道。 ChannelManager.addChannel(socketChannel); System.out.println(String.format("接收到客户端的连接:IP[%s]-端口[%s]", socketChannel.socket().getPort(), socketChannel.socket().getInetAddress())); // 把这个通道设置为非阻塞模式,然后又注册到selector中,等待事件发生 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } } //写事件在缓冲区直接达到阈值时候出发,一般不注册写事件。 // 通道处理完毕后 将此通道删除。如果下次又有此时间,会有一个新的key,所以记得删除处理过的key。 iter.remove(); } } } } catch (IOException e) { System.out.println("服务器断开连接"); } try { // 注意此处只会关闭,selector使注册到琪上面的selectionKeys无效,通道本身不会关闭。 selector.close(); ChannelManager.closeChannles(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } 通道管理类:

import java.io.IOException; import java.nio.channels.Channel; import java.util.LinkedList; public class ChannelManager { private static LinkedList<Channel> list = new LinkedList<>(); private static Thread thread; //用于开启心跳测试通道连接,实现省略 public static void addChannel(Channel channel) { list.add(channel); } //关闭所有的通道连接 public static void closeChannles() { for (Channel channel : list) { try { channel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("关闭通道失败"); } list.remove(); } } } 一个简单的客户端类: 这个脚本可以直接通过cmd 编译运行。

import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner; public class MainChannel { public static void main(String[] args) { // TODO Auto-generated method stub SocketChannel socketChannel = null; boolean flag = true; Scanner in = new Scanner(System.in); ByteBuffer buffer = ByteBuffer.allocate(1024); try { socketChannel = SocketChannel.open(); // 设置连接为非阻塞模式,可能未连接完成就返回 // socketChannel.configureBlocking(false); socketChannel.socket().connect(new InetSocketAddress("192.168.1.100", 8881)); // 判断是否连接成功,等待连接成功 while (!socketChannel.finishConnect()) { } while (flag) { String temp = in.nextLine(); buffer.clear(); buffer.put(temp.getBytes()); // limit指针 移动到 position位置 buffer.flip(); // 当buffer中有足够空间,则写到buffer中 while (buffer.hasRemaining()) socketChannel.write(buffer); if ("exit".equals(temp)) flag = false; } } catch (IOException e) { // TODO Auto-generated catch block System.out.println("与服务断开连接"); } if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } 上述代码,可以实现客户端向服务器发送消息,然后服务器接受到消息,在自己的控制台打印输出。

转载请注明原文地址: https://www.6miu.com/read-69489.html

最新回复(0)