NIO、BIO、Selector

Java
435
0
0
2022-09-02

NIO、BIO、Selector

IO

什么是 IO,IO 是Input、Output的简称,即输入输出,服务端与客户端交互的过程也是一种 IO。

BIO

全称 Blocking IO,即阻塞 IO,单个线程在处理单个请求时,如果当前请求没有下一步操作,当前线程会被卡住,如果有另一个请求进来,当前线程是无法响应新来的请求的。

Laravel

 public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(8000);

            while (true){
                // accept 会阻塞
                Socket accept = serverSocket.accept();
                System.out.println("客户端连接成功");
                byte[] bytes = new byte[1024];
                // read 也会阻塞 
                int read = accept.getInputStream().read(bytes);

                if (read != -1) {
                    System.out.println("收到消息:" + new String(bytes));
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
缺点:一个线程只能处理一个请求,无法应对高并发场景

NIO

全称 New IO,又称 Non-Blocking IO,即非阻塞的 IO 模型。服务器再处理 acceptread 等操作时,并不会阻塞当前线程,而是继续执行下面的代码。

  public static List<SocketChannel> CHANNEL_LIST = new ArrayList<>();

    public static void main(String[] args) {

        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8000));
            // 配置 socket 为非阻塞
            serverSocketChannel.configureBlocking(false);

            while (true) {
                // 接受客户端请求
                SocketChannel accept = serverSocketChannel.accept();
                if (accept != null) {
                    System.out.println("连接成功");
                    // 设置客户端 socket 位非阻塞 ==> 读操作是从这个 socket 里面读取
                    accept.configureBlocking(false);
                    // 把建立好连接的 socket 放入 List 中
                    CHANNEL_LIST.add(accept);
                }

                // 遍历 List 
                for (SocketChannel socketChannel : CHANNEL_LIST) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                    int read = socketChannel.read(byteBuffer);

                    if (read > 0) {
                        System.out.println("收到消息:" + new String(byteBuffer.array()));
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

Laravel

改进:现在一个线程可以处理多个请求
缺点:1. 服务器空转,当没有 acceptread 操作时,浪费 CPU 资源。
2. 已建立连接的 Socket List 无法快速定位当前发送数据的 socket

Selector

 public static void main(String[] args) {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8000));
            // 设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 创建 Selector ==> epoll_create 
            Selector selector = Selector.open();
            // 将 server socket 注册 ACCEPT 事件到 Seletor 中  ==> epoll_ctl
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 获取事件 ==> epoll_wait 如果此时这个 Selector 监听的 socket 没有事件发生,则会挂起 
                // 处理方式类似 阻塞队列中的 poll,队列中没数据时会 park 当前线程,来了数据会 unpark 当前线程 
                // 阻塞队列中的数据写入是其他线程操作的,而 epoll 中的事件写入是系统层面进行的
                selector.select();
                // 获取有事件产生的 Keys
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                // 遍历 Keys 
                if (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    // 如果当前事件是连接事件 
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
                        // 建立连接 
                        SocketChannel accept = serverChannel.accept();
                        // 配置通道为非阻塞
                        accept.configureBlocking(false);
                        // 注册当前通道到 selector 中,事件为 read
                        accept.register(selector, SelectionKey.OP_READ);
                        SocketAddress remoteAddress = accept.getRemoteAddress();
                        System.out.println("客户端" + remoteAddress + "连接成功");
                    } else 
                    // 如果事件是读取事件(即客户端发送了数据) 
                    if (selectionKey.isReadable()) {
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                        // 读取数据 
                        int read = channel.read(byteBuffer);

                        if (read > 0) {
                            System.out.println("收到消息:" + new String(byteBuffer.array()));
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

Laravel

EpollSelectorImpl

linux 内核函数

  • epoll_create:创建一个epoll的句柄
  • epoll_ctl:向epoll中注册事件
  • epoll_wait:返回 epoll 中已注册的事件

EPollSelectorProvider.openSelector()

 EPollSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    // 创建一个装 socket 句柄的数组
    pollWrapper = new EPollArrayWrapper();
    pollWrapper.initInterrupt(fd0, fd1);
    fdToKey = new HashMap<>();
}
void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;
    incomingInterruptFD = fd0;
    //将管道的读取端注册
    epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}

EpollSelector.implRegister

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    // fd 为当前 socket
    pollWrapper.add(fd);
    keys.add(ski);
}

EpollSelector.doSelect()

protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        // 从 socket 数组中取出事件 这一步如果没有事件更新就会挂起
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

int poll(long timeout) throws IOException {
    //更新epoll事件,实际调用`epollCtl`加入到epollfd中
    updateRegistrations();
    //获取已就绪的文件句柄
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    //如是唤醒文件句柄,则跳过,设置interrupted=true 
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            int fd = updateDescriptors[j];
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                //已经注册过 
                if (isRegistered) {
                    //修改或删除
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    //新增
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        //增加到registered缓存是否已注册
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}

NIO、BIO、Selector