介绍了 Java 的传统 I/O ,也就是 B-I/O (Blocking IO)。这篇文章介绍一下 N-I/O (Non-Block)的基本知识点,以及为什么 N-I/O 在高并发以及大文件的处理方面更有优势。
本地文件I/O操作——NIO小试牛刀
Channel和Buffer
BIO 里操作的是 InputStream 和 OutputStream ,在 NIO 中操作的则是 Channel 和 Buffer 。我们可以把 Channel 想象成矿藏,把 Buffer 想象成运矿的车。如果想移动数据,必须借助 Buffer ,这是移动数据的唯一方式。也就是说 Buffer 跟 Channel 必定形影不离。
NIO 中用的最多的三种 Channel ,分别是 FileChannel , SocketChannel ,以及 ServerSocketChannel 。 FileChannel 是用来操作本地文件的,而另外两个则是进行网络 I/O 操作的。
FileChannel 这里通过将文件 test-io.tmp 里面的内容移动到文件 test-io.md 中,让大家感受一下如何使用 Channel 和 Buffer 进行文件 I/O 操作。
示例: NIO 方式操作本地文件。
//通过FileInputstream拿到输入FileChannel。
FileChannel in = new FileInputStream ("test-io.tmp").getChannel();
//通过 File OutPutStream拿到输出FileChannel
FileChannel out = new FileOutputStream("test-io.md").getChannel();
//创建一个字节缓冲器,用于运送数据。
ByteBuffer buffer = ByteBuffer.allocate();
while (in.read(buffer) != -){
//相当于缓冲器的开关,只有调用该方法,缓冲器里面的数据才能被写入到输出Channel.
buffer.flip();
out.write(buffer);
buffer.clear();
}
上面的代码很轻松地实现了,将文件 test-io.tmp 中的内容移动到 test -io.md 中。
代码解读
通过 FileInputStream 对象的 getChannel 方法拿到了 Channel 。
通过 byte Buffer 的 allocate 方法(也可以是 allocateDirecty 方法)声明一个缓冲器,容量是 1024 字节,用于传输数据。
将数据源 channel 里面的数据通过 read 方法读取到缓冲器。 通过 out.write() 方法,将缓冲器里面的数据写入到输出 Channel 。最后清空缓冲器,为下次读取数据做准备。
ByteBuffer
ByteBuffer 是 Buffer 的一个子类。还有很多其它子类,比如 CharBuffer , DoubleBuffer 等 , ByteBuffer 是用的最多的缓冲器。
我们可以把 Byte Buffer 想象成一个字节数组。大概是这个样子。
上图是刚刚初始化的示意图,position表示游标,每读取一个字节,position就移动一个位置。
ByteBuffer 有几个比较重要的方法,如下
allocate() : 创建一个缓冲器,例如 ByteBuffer.allocate(1024) 。 allocateDirect() : 创建一个与操作系统底层更 耦合 的缓冲器。 capacity() : 返回缓冲区数组的容量。 position() : 下一个要操作的元素位置。 limit() : 返回limit的值。 flip() :打开缓冲器的阀门,做好被读取的准备。 put() :将字节存储进缓冲器。例如 byteBuffer.put(“hello”.getBytes(“utf-8”)); wrap() :将字节数组存储进缓冲器。例如 ByteBuffer.wrap(“hello”.getBytest()) rewind() :将position设置为0。 clear() :清空缓冲区。 hasRemaining() 若介于position和limit之间有值,则返回true。
零拷贝
上面的例子还有另外一种实现,看代码。
public class ChannelTransfer {
public static void main(String[] args) throws Exception {
FileChannel in = new FileInputStream("test-io.tmp").getChannel();
FileChannel out = new FileOutputStream("test-io.md").getChannel();
in.transferTo(,in.size(),out);
//或者
//out.transferFrom(in,,in.size());
}
}
直接将输入端和输出端进行对接,不经过操作系统的内核态。这就是大名鼎鼎的 零拷贝 技术的运用。 Kafka 的性能之所以那么生猛,很大一部分原因是运用了零拷贝技术。
超大内存文件读取
所谓超大文件就是,要操作的文件比你系统的可用内存还大,此时可以使用 NIO 提供的类库方法进行如下操作。
public static void main(String[] args) throws Exception {
FileChannel fileChannel = new FileInputStream("test-io.tmp").getChannel();
//通过map()方法产生一个缓冲器.
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY,, fileChannel.size());
if ( map pedByteBuffer != null){
CharBuffer charBuffer = Charset.forName("utf-").decode(mappedByteBuffer);
System.out.println(charBuffer.toString());
}
}
注意map()函数有三个参数,分别表示读写模式,初始位置以及映射长度。 因为我的测试文件很小,所以就全部映射了。如果 源文件 较大(100G)可以每次映射500M或1G,根据机器性能不同找到一个最优值。
FileChannel 的知识点基本就这些了,相信通过上面的介绍,各位对 NIO 的 Channel 和 Buffer 已经有了一个基本的认识。
网络I/O——NIO大显身手。
我们一直在说 NIO 是非阻塞 I/O ,但是上面介绍的 FileChanel 并不能设置成非阻塞模式,你说搞笑不。 FileChannel 相比于传统的(BIO)来说,最大的优势在于大文件的处理,以及零拷贝等技术的运用和处理。如果你问我底层实现原理是什么,其实我也不知道,只知道 FileChannel 提供的很多方法,以一种更迎合操作系统的方式来工作。所谓马屁拍得好,升职加薪来得早。
如果各位真想深究底层原理,建议先去了解操作系统的知识,然后再去扒 JDK 的源码。
真正支持非阻塞操作的是 server SocketChannel 和 SocketChannel 。也只有在进行网络 I/O 的时候,非阻塞 I/O 的优势才能被最大程度地发挥出来。
如果想了解各种 I/O 的详细内容可以看我这篇文章。
需求提出
假设我们要实现一个简单的服务端程序,唯一的功能是接收客户端发过来的请求,然后将请求内容转换为大写之后再发回给客户端。
BIO 实现方式
当客户端发送一个请求的时候,服务端则创建一个线程进行处理。当客户端同时发送100个请求的时候,服务端就创建100个线程进行处理。这看起来还不错,但如果请求数量有几千或者更高的时候,那么服务端可能就会有点儿吃不消了。
原因如下:
- 线程的创建和销毁很占用系统资源,即便有 线程池 技术,也不能从根本上解决问题,而且在Linux里面线程就是轻量级进程
- 线程不可以无限制的创建下去,Java里面每个线程要占用512K-1M的内存空间。
- 线程间的不断切换很消耗系统资源,因为要保留上下文等内容。
BIO是个实在孩子。
BIO 选择 多线程 的方式也是无奈之选。因为 Socket.write 和 Socket.read 都是阻塞的。所谓的阻塞的意思就是一旦线程开始执行 socket.read 操作了,那么就需要等这个读操作执行完成。如果这个时候没有数据可以读,那么就需要等待,等到有数为止。这是 BIO 的天然属性,没有办法,简直太实在了。所以如果想充分地利用 CPU ,就得多创建几个线程,一个线程没有数据,另外一个总有吧,这就叫东方不亮西方亮。
来一段简简单单的 伪代码 ,大家稍微感受一下吧。
//整个线程池
ExecutorService executor = Executors.newFixedThreadPool();
Server socket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress());
//循环监听等待新连接到来
while(true) {
Socket socket = server Socket .accept();
//为新的连接创建新的线程
executor.submit(new Task(socket));
}
class Task implements Runnable {
private Socket socket;
public Task(Socket socket) {
this.socket = socket;
}
@ Override
public void run() {
while (!socket.isClosed()) {
//读数据,阻塞
String someThing = socket.read();
if (someThing != null) {
//处理数据,返回客户端,阻塞
socket.write();
}
}
}
}
NIO是个聪明孩子。
BIO 的问题出在了阻塞的读和写上面。因为阻塞 I/O 太实在,没有数据就死等数据,造成 CPU 没有被充分利用的尴尬局面。相比于 BIO , NIO 就聪明多,因为它根本就不会等,而是有数据的时候,你通知我一下,我派 CPU 去取。到哪儿就取,取完就走,一点儿不废话,速度那叫一个快。以 CPU 的(智商)运算速度,一个人管理几千个通道根本不是事儿。这就是 Reactor 编程模型,也叫基于事件编程。
既然是基于事件编程,那么 NIO 里面比较重要的几个事件分别是, Read , write , Accept , Connect 。
在 NIO 编程模型中,每个客户端跟服务端建立的连接都是一个 Channel ,这些 Channel 一旦有数据了,就会通知 CPU 去对应的通道取数。所以根本不会像 BIO 那样,发生 线程 死等数据的情况。这也就是 CPU 利用高的原因。
NIO的网络编程模型有点儿类似于孙悟空的悬丝诊脉。
使用NIO进行网络编程
上面提到了,NIO网络编程是基于事件编程,那么就得有人负责事件的监听。这个工作由 Select 完成。当有感兴趣的事情发生, Select 就会第一时间知道。
SelectionKey 也是一个相当重要的角色,相当于 Select 和 Channel 沟通的桥梁。因为 Select 不光要知道有感兴趣的事情发生了,还要知道哪个 Channel 发生了什么事件。
NIO 网络编程里面的主角就给大家都介绍完了,分别是选择器 Selector ,通道 ServerSocketChannel 和 SocketChanel ,以及在上面提到的缓冲器 ByteBuffer ,还有 SelectionKey 。
下面给大家简单演绎一下,如何用NIO的方式,实现上文中提到的那个服务端程序。先看代码吧。
public class EchoNioServer {
public static final int BUF_SIZE =;
public static void main(String[] args) {
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress());
System.out.println("正在端口监听...");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, byteBuffer);
while (true) {
selector.select();
Iterator <SelectionKey> iterator = selector.selectedKeys(). iterator ();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, byteBuffer);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = (ByteBuffer) key.attachment();
readBuffer.clear();
socketChannel.read(readBuffer);
readBuffer.flip();
System.out.println("received from client: " + new String(readBuffer.array()).trim());
socketChannel.register(selector, SelectionKey.OP_WRITE, readBuffer);
} else if (key.isWritable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer writeBuffer = (ByteBuffer) key.attachment();
String msg = new String(writeBuffer.array()).trim().toUpperCase();
writeBuffer.clear();
writeBuffer.put(msg.getBytes("utf-"));
writeBuffer.flip();
socketChannel.write(writeBuffer);
writeBuffer.clear();
socketChannel.close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
代码解读
帮大家做个简单的解读。方便大家理解。
- 先创建一个选择器及缓冲器备用,一个用于监听感兴趣的事件,一个用于运送数据。
Selector select = Selector.open(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); 2. 创建一个 ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); 3. 设置为非阻塞模式(必须设置为非阻塞,不然你还是什么NIO) ssc.configureBlocking(false) 4. 绑定端口 ssc.bind(8888) 5. 将通道注册到选择器,并告诉选择器,我对哪些些事件感兴趣。当事件到来就调用相应的逻辑进行处理。 sss.register(select,SelectionKey.Accept) 6. 调用 select.selct() 方法,找出可用的通道,这个方法是阻塞的,所以放到while(true)也不会造成CPU空转。 7. 针对不同的事件做不同的处理。
与上面服务端代码配套的客户端代码,我就不做过多解释了。
public class EchoNioClient {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool();
for (int i =; i < 10; i++) {
executor.submit(new Task());
}
executor.shutdown();
}
}
class Task implements Runnable {
InetSocketAddress remoteAddress = new InetSocketAddress();
static final int BUF_SIZE =;
@Override
public void run() {
try {
String msg = "hello I'm " + Thread.currentThread().getName();
SocketChannel socketChannel = SocketChannel.open(remoteAddress);
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
byteBuffer.clear();
byteBuffer.put(msg.getBytes("utf-"));
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
ByteBuffer receiveBuffer = ByteBuffer.allocate();
while (socketChannel.read(receiveBuffer) != -) {
receiveBuffer.flip();
System.out.println("received from server: " + new String(receiveBuffer.array()).trim());
receiveBuffer.clear();
}
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
结束
希望这篇文章能帮助你更好地理解NIO基础编程。了解了这些基础知识之后,无聊的时候就可以去看看Tomcat的源码,有机会也可以跟那些经常用Netty写高性能网关服务的大牛聊聊天了。
最后强烈建议各位,把文中的例子放到自己的IDE里面,跑一遍,最好自己再动手写一写,千万不要一看我都会,一写就蒙圈,眼高手低可是大忌。