NIO、BIO、Selector
IO
什么是 IO,IO 是Input、Output的简称,即输入输出,服务端与客户端交互的过程也是一种 IO。
BIO
全称 Blocking IO,即阻塞 IO,单个线程在处理单个请求时,如果当前请求没有下一步操作,当前线程会被卡住,如果有另一个请求进来,当前线程是无法响应新来的请求的。
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8000);
while (true){
// accept 会阻塞
Socket accept = serverSocket.accept();
System.out.println("客户端连接成功");
byte[] bytes = new byte[1024];
// read 也会阻塞
int read = accept.getInputStream().read(bytes);
if (read != -1) {
System.out.println("收到消息:" + new String(bytes));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
缺点:一个线程只能处理一个请求,无法应对高并发场景
NIO
全称 New IO,又称 Non-Blocking IO,即非阻塞的 IO 模型。服务器再处理 accept
和 read
等操作时,并不会阻塞当前线程,而是继续执行下面的代码。
public static List<SocketChannel> CHANNEL_LIST = new ArrayList<>();
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8000));
// 配置 socket 为非阻塞
serverSocketChannel.configureBlocking(false);
while (true) {
// 接受客户端请求
SocketChannel accept = serverSocketChannel.accept();
if (accept != null) {
System.out.println("连接成功");
// 设置客户端 socket 位非阻塞 ==> 读操作是从这个 socket 里面读取
accept.configureBlocking(false);
// 把建立好连接的 socket 放入 List 中
CHANNEL_LIST.add(accept);
}
// 遍历 List
for (SocketChannel socketChannel : CHANNEL_LIST) {
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("收到消息:" + new String(byteBuffer.array()));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
改进:现在一个线程可以处理多个请求
缺点:1. 服务器空转,当没有accept
或read
操作时,浪费 CPU 资源。
2. 已建立连接的 Socket List 无法快速定位当前发送数据的 socket
Selector
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8000));
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 创建 Selector ==> epoll_create
Selector selector = Selector.open();
// 将 server socket 注册 ACCEPT 事件到 Seletor 中 ==> epoll_ctl
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 获取事件 ==> epoll_wait 如果此时这个 Selector 监听的 socket 没有事件发生,则会挂起
// 处理方式类似 阻塞队列中的 poll,队列中没数据时会 park 当前线程,来了数据会 unpark 当前线程
// 阻塞队列中的数据写入是其他线程操作的,而 epoll 中的事件写入是系统层面进行的
selector.select();
// 获取有事件产生的 Keys
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历 Keys
if (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 如果当前事件是连接事件
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
// 建立连接
SocketChannel accept = serverChannel.accept();
// 配置通道为非阻塞
accept.configureBlocking(false);
// 注册当前通道到 selector 中,事件为 read
accept.register(selector, SelectionKey.OP_READ);
SocketAddress remoteAddress = accept.getRemoteAddress();
System.out.println("客户端" + remoteAddress + "连接成功");
} else
// 如果事件是读取事件(即客户端发送了数据)
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 读取数据
int read = channel.read(byteBuffer);
if (read > 0) {
System.out.println("收到消息:" + new String(byteBuffer.array()));
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
EpollSelectorImpl
linux 内核函数
- epoll_create:创建一个epoll的句柄
- epoll_ctl:向epoll中注册事件
- epoll_wait:返回 epoll 中已注册的事件
EPollSelectorProvider.openSelector()
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 创建一个装 socket 句柄的数组
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
//将管道的读取端注册
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
EpollSelector.implRegister
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
// fd 为当前 socket
pollWrapper.add(fd);
keys.add(ski);
}
EpollSelector.doSelect()
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 从 socket 数组中取出事件 这一步如果没有事件更新就会挂起
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
//更新epoll事件,实际调用`epollCtl`加入到epollfd中
updateRegistrations();
//获取已就绪的文件句柄
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
//如是唤醒文件句柄,则跳过,设置interrupted=true
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
//已经注册过
if (isRegistered) {
//修改或删除
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
//新增
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
//增加到registered缓存是否已注册
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}