开场废话

2022.04.05

Java 为了解决 BIO 中,serverSocket.accept() 阻塞的问题,在 JDK1.4 推出了 NIO 弥补不足。

Java NlO (New lO)也有人称之为 java non-blocking IO。NIO 可以完美得替代原本的 IO API,但是实现方式完全不一样。

NIO 可以理解为非阻塞IO,传统IO 的read和write只能阻塞执行,线程在读写期间不能干其他事倩,比如调用socket. read()时,如果服务器一直没有数据传输过来,线程就一直阻塞;

而 NIO 中可以配置socket为非阻塞模式。

NIO 简单介绍

标准IO 是对字节流的读写,在进行IO之前,首先创建一个流对象,流对象进行读写操作都是按字节 ,一个字节一个字节的来读或写。

NIO 把IO抽象成块 ,类似磁盘的读写,每次IO操作的单位都是一个块,块被读入内存之后就是一个byte[],NIO一次可以读或写多个字节,效率也高很多。

NIO有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)

数据总是从 Channel(通道)读取到 Buffer(缓冲区)中,或者从缓冲区写入到通道中。

Selector(选择器)用于监听多个通道的事件(比如:连接请 求,数据到达等),因此使用单个线程就可以监听多个客户端通道


Buffer 缓冲区

发送给一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取的任何数据都要先读到缓冲区中。

也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。

缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。


Channel 通道

通道 Channel 是对原 I/O 包中的流的模拟,可以通过它读取和写入数据。

通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。

通道包括以下类型:

  • FileChannel: 从文件中读写数据;
  • DatagramChannel: 通过 UDP 读写网络中数据;
  • SocketChannel: 通过 TCP 读写网络中数据;
  • ServerSocketChannel: 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

Selector 选择器

Selector 是一个 java NIO组件,可以能够检查一个或多个NIO通道,并确定哪些通道已经准备好进行读取或写入。

这样,一个单独的线程可以管理多个channel,从而管理多个网络连接,提高效率


NIO 代码示例

服务端代码如下,主线程,启动监听:

public static void main(String[] args) throws Exception {
    //记录套接字通道事件
    Selector selector = Selector.open();
    //定义一个异步socket对象
    ServerSocketChannel ssc = ServerSocketChannel.open();
    //设置异步
    ssc.configureBlocking(false);
    //获取socket对象
    ServerSocket socket = ssc.socket();
    //绑定端口
    InetSocketAddress address = new InetSocketAddress(9999);
    socket.bind(address);
    //将事件注册selector对象内
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("端口注册完毕!");

    while (true) {
        //查询事件如果一个事件都没有就阻塞
        selector.select();
        //定义一个byte缓冲区来存储收发的数据
        ByteBuffer echoBuffer = ByteBuffer.allocate(10);
        //处理数据
        accept(selector, echoBuffer);
    }
}

数据处理方法:

//处理数据
public static void accept(Selector selector, ByteBuffer echoBuffer) {
    SocketChannel sc;
    try {
        //此循环遍例所有产生的事件
        for (SelectionKey key : selector.selectedKeys()) {
            //如果产生的事件为接受客户端连接(当有客户端连接服务器的时候产生)
            if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                selector.selectedKeys().remove(key);
                //定义一个服务器socket通道
                ServerSocketChannel subssc = (ServerSocketChannel) key.channel();
                //将临时socket对象实例化为接收到的客户端的socket
                sc = subssc.accept();
                //将客户端的socket设置为异步
                sc.configureBlocking(false);
                //将客户端的socket的读取事件注册到事件选择器中
                sc.register(selector, SelectionKey.OP_READ);
                //将本此事件从迭带器中删除
                System.out.println("有新连接:" + sc);

                //如果产生的事件为读取数据(当已连接的客户端向服务器发送数据的时候产生)
            } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                //将本次事件删除
                selector.selectedKeys().remove(key);
                //临时socket对象实例化为产生本事件的socket
                sc = (SocketChannel) key.channel();
                //定义一个用于存储byte数据的流对象
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //先将客户端的数据清空
                echoBuffer.clear();
                //a为读取到数据的长度
                try {
                    //循环读取所有客户端数据到byte缓冲区中,当有数据的时候read函数返回数据长度
                    //NIO会自动的将缓冲区一次容纳不下的自动分段
                    int readInt = 0;
                    while ((readInt = sc.read(echoBuffer)) > 0) {
                        //如果获得数据长度比缓冲区大小小的话
                        if (readInt < echoBuffer.capacity()) {
                            //建立一个临时byte数组,将齐长度设为获取的数据的长度
                            byte[] readByte = new byte[readInt];
                            //循环向此临时数组中添加数据
                            for (int i = 0; i < readInt; i++) {
                                readByte[i] = echoBuffer.get(i);
                            }
                            //将此数据存入byte流中
                            bos.write(readByte);
                        } else {
                            //将读取到的数据写入到byte流对象中
                            bos.write(echoBuffer.array());
                        }
                        //将缓冲区清空,以便进行下一次存储数据
                        echoBuffer.clear();
                    }
                    //当循环结束时byte流中已经存储了客户端发送的所有byte数据
                    System.out.println("接收数据: " + bos);
                } catch (Exception e) {
                    //当客户端在读取数据操作执行之前断开连接会产生异常信息
                    e.printStackTrace();
                    //将本socket的事件在选择器中删除
                    key.cancel();
                    break;
                }
                //获取byte流对象的标准byte对象
                byte[] b = bos.toByteArray();
                //建立这个byte对象的ByteBuffer,并将数据存入
                ByteBuffer byteBuffer = ByteBuffer.allocate(b.length);
                byteBuffer.put(b);
                //向客户端写入收到的数据
                write(byteBuffer, sc);
                //关闭客户端连接
                sc.close();
                //将本socket的事件在选择器中删除
                key.cancel();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println("连接结束");
        System.out.println("=============================");
    }
}

数据回写方法:

//写数据
public static void write(ByteBuffer echoBuffer, SocketChannel sc) {
    //将缓冲区复位以便于进行其他读写操作
    echoBuffer.flip();
    try {
        //向客户端写入数据,数据为接受到数据
        sc.write(echoBuffer);
    } catch (IOException e) {
        e.printStackTrace();
        return;
    }
    System.out.println("返回数据: " + new String(echoBuffer.array()));
}

客户端代码:

public static void main(String[] args) {
    NioClient client = new NioClient();
    client.send("66666");
}

//启动连接
public void send(String mssage) {
    try {
        //定义一个记录套接字通道事件的对象
        Selector selector = Selector.open();
        //定义一个服务器地址的对象
        SocketAddress address = new InetSocketAddress("127.0.0.1", 9999);
        //定义异步客户端
        SocketChannel client = SocketChannel.open(address);
        //将客户端设定为异步
        client.configureBlocking(false);
        //在轮讯对象中注册此客户端的读取事件(就是当服务器向此客户端发送数据的时候)
        client.register(selector, SelectionKey.OP_READ);
        //定义用来存储发送数据的byte缓冲区
        ByteBuffer sendbuffer = ByteBuffer.allocate(mssage.length());
        //定义用于接收服务器返回的数据的缓冲区
        ByteBuffer readBuffer = ByteBuffer.allocate(mssage.length());
        //将数据put进缓冲区
        sendbuffer.put(mssage.getBytes(StandardCharsets.UTF_8));
        //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
        sendbuffer.flip();
        //向服务器发送数据
        client.write(sendbuffer);
        System.out.println("发送数据: " + new String(sendbuffer.array()));

        //利用循环来读取服务器发回的数据
        while (true) {
            //如果客户端连接没有打开就退出循环
            if (!client.isOpen()) {
                break;
            }
            //此方法为查询是否有事件发生如果没有就阻塞,有的话返回事件数量
            int shijian = selector.select();
            //如果没有事件返回循环
            if (shijian == 0) {
                continue;
            }
            this.send(selector, readBuffer, client);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

实际发送数据:

//发送数据
public void send(Selector selector, ByteBuffer readBuffer, SocketChannel client) {
    try {
        //遍例所有的事件
        for (SelectionKey key : selector.selectedKeys()) {
            //删除本次事件
            selector.selectedKeys().remove(key);
            //如果本事件的类型为read时,表示服务器向本客户端发送了数据
            if (key.isReadable()) {
                //将临时客户端对象实例为本事件的socket对象
                SocketChannel sc = (SocketChannel) key.channel();
                //定义一个用于存储所有服务器发送过来的数据
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //将缓冲区清空以备下次读取
                readBuffer.clear();
                //此循环从本事件的客户端对象读取服务器发送来的数据到缓冲区中
                while (sc.read(readBuffer) > 0) {
                    //将本次读取的数据存到byte流中
                    bos.write(readBuffer.array());
                    //将缓冲区清空以备下次读取
                    readBuffer.clear();
                }
                //如果byte流中存有数据
                if (bos.size() > 0) {
                    //建立一个普通字节数组存取缓冲区的数据
                    System.out.println("接收数据: " + bos);
                    //关闭客户端连接,此时服务器在read读取客户端信息的时候会返回-1
                    client.close();
                    System.out.println("连接关闭!");
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

执行流程简介


Buffer 缓冲区详解

抽象类 java.nio.Buffer 是所有 NIO 缓冲区实现类的父类。

缓冲区包括以下类型: ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer

下面以 ByteBuffer 为例,缓冲区的创建方法用的是:

// 创建一个容量为capacity的 ByteBuffer 对象
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0) throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

最后还是回到 ByteBuffer 的构造方法:

// 使用给定的标记、位置、限制、容量、后备数组和数组偏移量创建一个新缓冲区
// package-private
ByteBuffer(int mark, int pos, int lim, int cap, byte[] hb, int offset) {
    super(mark, pos, lim, cap);
    this.hb = hb;
    this.offset = offset;
}

Buffer中的重要概念和字段:

  • 容量(capacity):作为一个内存块,Buffer具有一定的固定大小,也称为”容量”,缓冲区容量不能为负,并且创建后不能更改(因为它是数组)。

  • 限制(limit):表示缓冲区中可以操作数据的大小(limit后数据不能进行读写)。缓冲区的限制不能为负,并且不能大于其容量。

    • 写入模式,限制等于buffer 的容量。读取模式下,limit 等于写入的数据量。
  • 位置(position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制

  • 标记(mark)与重置(reset):标记是一个索弓l,通过Buffer中的mark()方法指定Buffer中一个特定的 position,之后可以通过调用reset()方法恢复到这个position。

    • 标记、位置、限制、容量遵守以 T 不变式:0<=mark<=position<=limit<=capacity

缓冲区流程图解

  • capacity: 最大容量;
  • position: 当前已经读写的字节数;
  • limit: 还可以读写的字节数。

状态变量的改变过程举例:

① 新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。

② 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 移动设置为 5,limit 保持不变。

③ 在将缓冲区的数据写到输出通道之前,需要先调用 flip() 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。

④ 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。

⑤ 最后需要调用 clear() 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。

buffer 常见方法

方法作用
Buffer clear()清空缓冲区并返回对缓冲区的引用
Buffer flip()为将缓冲区的界限设置为当前位置,并将当前位置重置为0
int capacity()返回Buffer的capacity大小
boolean hasRemaining()判断缓冲区中是否还有元素
int limit()返回Buffer的界限(limit)的位置
Buffer limit(int n)将设置缓冲区界限为n,并返回一个具有新limit的缓冲区对象
Buffer mark()对缓冲区设置标记
int position()返回缓冲区的当前位置position
Buffer position(int n)将设置缓冲区的当前位置为n,并返回修改后的Buffer对象
int remaining()返回position和limit之间的元素个数
Buffer reset()将位置position转到以前设置的mark所在的位置
Buffer rewind()将位置设为为0.取消设置的mark

数据操作方法

取获取Buffer中的数据

  • get():读取单个字节
  • get (byte[〕dst):批量读取多个字节到dst中
  • get(int index):读取指定索引位置的字节(不会移动position)

放入数据到Buffer中

  • put(byte b):将给定单个字节写入缓冲区的当前位置
  • put (byte[] src):将src中的字节写入缓存区的当前位置
  • put(int index,byte b)L将指定字节写入缓存区的索引位置(不会移动position)

Channel 通道详解

通道(Channel):由 java.nio.channels 包定义的。Channel表示IO源与目标打开的连接。

Channel 类似于传统的“流”。只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。

BlO 中的 stream 是单向的,只能用来读或者写,而 NIO 的 Channel 是双向的。

常用的 Channel 实现类

  • FileChannel:用于读取、写入、映射和操作文件的通道
  • DatagramChannel:通过UDP读写网络中的数据通道
  • SocketChannel:通过TCP读写网络中额数据
  • ServerSocketChannel:可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个 SocketChannel。
    • 【ServerSocketChannel类似ServerSocket,SocketChannel类似Socket】

通道使用

获取通道的一种方式是对支持通道的对象调用 getChannel() 方法,支持这个方法的类如下:

  • 本地io:FileInputStreanm/FileOutputStreamRandomAccessFile

  • 网络io:SocketServerSocketDatagramSocket

文件读写

 public static void writeTxt() throws Exception {
     //1.字节输出流通向目标文件
     FileOutputStream fos = new FileOutputStream("C:\\Users\\shiva\\Desktop\\1.txt");
     //2.得到字节输出流对应的通道 Channel
     FileChannel channel = fos.getChannel();
     //3.分配缓存区
     ByteBuffer buffer = ByteBuffer.allocate(1024);
     buffer.put("缓冲区写入数据到txt!".getBytes());
     //4.把缓存区切换为写模式
     buffer.flip();
     channel.write(buffer);
     channel.close();
 }

public static void readTxt() throws Exception {
    FileInputStream is = new FileInputStream("C:\\Users\\shiva\\Desktop\\1.txt");
    FileChannel channel = is.getChannel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    String rs = new String(buffer.array(), 0, buffer.remaining());
    System.out.println(rs);
}

通道复制

transferFormtransferTo 两个方法,看代码就行

public static void transfer() throws Exception {
    //输入流
    FileInputStream is = new FileInputStream("C:\\Users\\shiva\\Desktop\\1.txt");
    FileChannel isChannel = is.getChannel();
    //输出流
    FileOutputStream fos = new FileOutputStream("C:\\Users\\shiva\\Desktop\\2.txt");
    FileChannel fosChannel = fos.getChannel();
    //从 XXX 复制
    fosChannel.transferFrom(isChannel, isChannel.position(), isChannel.size());
    //复制到 XXX
    //isChannel.transferTo(isChannel.position(), isChannel.size(), fosChannel);
    isChannel.close();
    fosChannel.close();
}

Selector 选择器详解

选择器(Selector)是SeIectabIe ChanneI对象的多路复用器,Selector可以同时监控多个 SelectableChannel 的IO状况,

也就是说,利用 Selector 可使一个单独的线程管理多个 Channel。

Selector 能够检测多个注册的通道上篡若有事件发生(注意:多个Channel以事件的方式可以注册到同一个 Selector);

如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程 避免了多线程之间的上下文切换导致的开销


Selector 使用

Selector selector = Selector.open();

向选择器注册通道:SelectableChannel.register(Selector sel,int ops);

ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

通道必须配置为非阻塞模式,否则使用选择器就没有任何意义了

因为如果通道在某个事件上被阻塞,那么服务器就不能响应其它事件,必须等待这个事件处理完毕才能去处理其它事件,显然这和选择器的作用背道而驰。

ops指定,可以监听的事件类型(用可使用Selection Key的四个常量表示):

  • 读:SelectionKey.OP_READ (1)
  • 写:SelectionKey.OP_WRITE (4)
  • 连接:SelectionKey.OP_CONNECT (8)
  • 接收:SelectionKey.OP_ACCEPT (16)
  • 若注册时不止监听一个事件,则可以使用‘位或”操作符连接。
int interestSet = selectionKey.OP_READ | SelectionKey.OP_WERITE

事件监听

int num = selector.select();

使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

代码就不写了,上门的示例包含了全流程

参考文章