关于 NIO 你不得不知道的一些“地雷”

本文是笔者在学习NIO过程中发现的一些比较容易让人忽略的知识的一个总结,而这些让人忽略的小细节恰恰是NIO网络编程中必不可少。虽然现在我们不会直接编写NIO来完成我们的网络层通讯,而是使用成熟的基于NIO的网络框架来实现我们的网络层。如,netty、mina。但对NIO网络编程过程的了解,非常有助于我们更深入的理解netty、mina等网络框架,以至于能更好的使用它们。

因此,本文并不对NIO的一些基层知识做过多的介绍,主要侧重于NIO编程中细节的讲解。

NIO VS IO

  • 标准的IO基于字节流和字符流进行操作的;而NIO是基于通道(Channel)进行操作的。
  • 通道是双向的,既可以写数据到通道,又可以从通道中读取数据;而流的读写通常是单向的,要么是输入流,要么是输出流,不能既是输入流又是输出流。
  • NIO能够实现非阻塞的网络通信,而IO只能实现阻塞式的网络通信。

Buffer

Java NIO中的Buffer用于和NIO通道进行交互。数据总是从通道读取到缓冲区,或者从缓冲区写入到通道中。
Buffer是一个特定的原生类型数据容器。
Buffer是一种特定的原生类型的线程的、有限的元素序列。除了它的内容之外,一个Buffer一个重要的本质属性是它的capacity、limit、和position;

  • capacity:一个buffer的capacity指的就是它所包含的元素的个数。buffer的capacity永远不会是负数,且永远不会变化。
  • limit:一个buffer的limit指的是不应该被读或写的第一个元素的索引( position <= limit )。一个buffer的limit永远不会是负数的,并且永远不会超过它的capacity。
  • position:一个buffer的position指的是下一个将要被读或写的元素的索引。一个buffer的position永远不会是负数的,并且永远不会超过它的limit( 这里也说明,position最多等于limit,当position==limit时,这个时候是不能够在从buffer中读取到数据了 )。

数据操作:

Buffer的每一个子类都定义了两类get和put操作。

  • 相对操作:读或写 一个或多个元素 从当前position位置开始并且会根据转换元素数量增加position的值。如果要求的转换超过了limit,那么一个相关的get操作会抛出BufferUnderflowException,一个相关的put操作会抛出一个BufferOverflowException,无论是这两个哪种情况发生,都不会有数据被传递。
  • 绝对操作:会接受一个显示元素的索引并且不会影响position。如果索引参数超过了limit,那么绝对的get和put操作会抛出一个IndexOutOfBoundsException异常。

不变性:

0 <= mark <= position <= limit <= capacity

线程安全性:

buffer在多线程并发下并不是安全的。如果一个buffer会在多个线程使用,那么需要使用恰当的同步操作来访问buffer。也就是buffer本身并不是线程安全的。

Java NIO 内存分配

  • Heap buffer :堆栈的内存分配。堆栈就是Java内存模型当中内存的区域,位于堆上,堆是我们生成对象的区域。
  • Direct buffer :堆外内存分配。这个内存本身不是由JVM进行控制的,它是由操作系统进行统一的处理的。通过这种直接的缓冲就能实现zero-copy(零拷贝)的动作。 [ 关于堆外内存可详见:堆外内存 之 DirectByteBuffer 详解 ]

方法

  • flip()

flip方法将Buffer从写模式切换到读模式。

  • rewind()

rewind()方法将position设回0,limit保持不变,所以你可以重读Buffer中的所有数据。可见在调用rewind()之前Buffer已经是处于读模式了

  • clear()

让Buffer重新准备好重头开始再次被写入。该方法会将position、limit重置。如果此时还没有读取的数据,则就无法读取到了。虽然clear()不会清楚数据,但是position、limit标志位被重置了,所以无法找到哪些未读取数据的位置了。

  • compact()

compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

clear() VS compact()

clear只是对position、limit、mark进行重置,而compact在对position进行设置,以及limit、mark进行重置的同时,还涉及到数据在内存中拷贝。所以compact比clear更耗性能。但compact能保存你未读取的数据,将新数据追加到为读取的数据之后;而clear则不行,若你调用了clear,则未读取的数据就无法再读取到了。

  • Slice Buffer与原有buffer共享相同的底层数据

ByteBuffer.slice(start, end) —————— [start, end),即包含start,不包含end
slice返回的ByteBuffer底层数据和源ByteBuffer是共享的,所以无论对那个buffer进行修改,都会影响到另一buffer。

  • buffer.asReadOnlyBuffer()

只读buffer适用于方法传递时,你只希望你的调用端去读取你所提供的buffer。即,将一个只读buffer当做参数传递给某个方法。

  • ByteBuffer.wrap(byte[] array)

该方法生成的ByteBuffer底层就是你传进来的这个array数组,并没有进行数组拷贝,所以是和你传进来的array共享内容的。这也导致如果你修改了传进来的array数组的内容,是会反映到ByteBuffer的。

  • 关于Buffer的Scattering与Gathering

Scattering:允许read的时候传递一个buffer[]数组。将一个Channel中的数据给读到了多个buffer当中,它是按照顺序依次读入buffer当中的,而且总是当当前buffer已经写满了才会写下一个buffer。
Gathering:允许write的时候传递一个buffer[]数组。将多个buffer的数据写到一个Channel中。它会将第一个buffer中可读的数据都写入channel后,再将下一个buffer中的数据写入到channel中,以此依次将buffer中可读取的数据写到channel中。
Scattering与Gathering适用于网络操作中的自定义协议。比如,一个请求中带有两个请求头以及一个body,第一个请求头的数据长度固定是10个byte,第二个请求头的数据长度固定是5个byte,而body的长度是不确定的。那么我们就可以用3个buffer组成的数组来接这样的请求。bytebuffer[]数组中,第一个bytebuffer元素的容量为10,用于接受第一个请求头的信息;第二个bytebuffer元素的容量为5,用于接受第二个请求头的信息;第三个定义一个大容量的bytebuffer用于接受body的信息。这样就天然的实现了一种数据的分门别类。

Selector

为什么使用Selector?

仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。因为对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源。因此,使用的线程越少越好。

selector的非阻塞模式

与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

方法

  • wakeUp()

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即”醒来(wake up)”。

  • close()

用完Selector后调用其close()方法会关闭该Selector,即使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭。

linux下Selector底层是通过epoll来实现的,当创建好epoll句柄后,它就会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

关于selector的详细实现可见浅谈 Linux 中 Selector 的实现原理

SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。

方法

  • connect()

如果这个channel是非阻塞模式的,那么该方法的调用将启动一个非阻塞的操作。如果连接立即建立,当在连接一个本地地址时会发生,那么该方法会返回true。否则若连接还未建立该方法会返回一个false,并且连接操作最后必须通过调用finishConnect方法来完成。
这个方法可能在任何时候被调用。如果在该方法调用时,对应的channel执行了read或write操作,那么read或write操作将先会被阻塞直到connect操作完成。如果连接尝试启动但是失败了,也就是说,如果connect方法的调用抛出了一个检查异常,那么该通道将被关闭。
写了代码测试了下,无论是是本机,还是跨机器调用,都是返回false。

  • finishConnect()

通过设置一个socket为非阻塞模式来开启一个非阻塞连接操作,然后调用该socket的connect方法。一旦连接建立,或者尝试连接失败,那么socket channel将变为可连接的并且该方法可能被调用已完成连接的后续事件。如果连接操作失败,则调用该方法将导致一个相关的IOException异常被抛出。
如果这个channel已经连接了,那么调用该方法不会阻塞并会立即返回true。如果这个channel是非阻塞模式的,那么该方法将返回false如果连接操作还没完成。如果这个channel是阻塞模式的,那么该方法将会阻塞直到连接成功或失败,如果连接成功则返回true,否则将抛出一个检查异常以描述失败。
这个方法可能在任何时候被调用。如果在该方法调用时,对应的channel执行了read或write操作,那么read或write操作将先会被阻塞直到connect操作完成。如果了解尝试启动但是失败了,也就是说,如果connect方法的调用抛出了一个检查异常,那么该通道将被关闭。

  • isConnectionPending()

告知这个channel是否正在进行连接操作。
仅当这个channel的连接操作已经启动,但是还没完成( 用通过调用finishConnect方法来完成 )。

示例:

无论如何在connect后finishConnect()sorry 方法都是需要被调用的。调用finishConnect()的三种返回:

① 如果你在connect()后直接调用了finishConnect()( 并非在CONNECT事件中调用 ),则若finishConnect()返回了true,则表示channel连接已经建立,而且CONNECT事件也不会被触发了。
② 如果finishConnect()方法返回false,则表示连接还未建立好。那么就可以通过CONNECT事件来监听连接的完成。当然也可以像上面的写法,无论如何都会给SocketChannel注册CONNECT事件,finishConnect()方法的调用放到CONNECT事件处理中调用。
③ 如果finishConnect()方法抛出了一个IOException异常,则表示连接操作失败。

支持的事件:SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE

ServerSocketChannel

Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道, 就像标准IO中的ServerSocket一样。

支持的事件:SelectionKey.OP_ACCEPT

ServerSocketChannel & SocketChannel

关于selectedKey集合的处理

对于已经处理的SelectionKey需要充selectedKey集合中移除,如果不将已经处理的SelectionKey从selectedKey集合中移除,那么下次有新事件到来时,在遍历selectedKey集合时又会遍历到这个SelectionKey,这个时候就很可能出错了。比如,如果没有在处理完OP_ACCEPT事件后将对应SelectionKey从selectedKey集合移除,那么下次遍历selectedKey集合时,处理到到该SelectionKey,相应的ServerSocketChannel.accept()将返回一个空(null)的SocketChannel。

关于OP_WRITE事件:

OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
其实,在大部分情况下,我们直接调用channel的write方法写数据就好了,没必要都用OP_WRITE事件。那么OP_WRITE事件主要是在什么情况下使用的了?

其实OP_WRITE事件主要是在发送缓冲区空间满的情况下使用的。如:

while (buffer.hasRemaining()) {
     int len = socketChannel.write(buffer);   
     if (len == 0) {
          selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
          selector.wakeup();
          break;
     }
}

当buffer还有数据,但缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这是我们就可以继续将没写完的数据继续写出了。

而且在写完后,一定要记得将OP_WRITE事件注销:
selectionKey.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
注意,这里在修改了interest之后调用了wakeup();方法是为了唤醒被堵塞的selector方法,这样当while中判断selector返回的是0时,会再次调用selector.select()。而selectionKey的interest是在每次selector.select()操作的时候注册到系统进行监听的,所以在selector.select()调用之后修改的interest需要在下一次selector.select()调用才会生效。

关于远端关闭事件

SelectionKey并没有提供关闭事件,其实通过OP_READ是可以监听到远端的关闭操作的。
当OP_READ事件触发使,int readByteNum = channel.read(buffer)会返回从channel读取到的字节数。
① 当readByteNum > 0 时,表示从channel读取到了readByteNum个字节到buffer中。
② 当readByteNum == 0 时,表示channel中已经没有数据可以读取了,这个时候buffer的position==limit。
③ 当 readByteNum == -1 时,表示远端channel正常关闭了。这个时候我们就需要进行该通道的关闭和注销操作了。
netty源码中OP_READ事件也会根据读取到的字节数为-1时,进行channel的关闭操作。

这里closeOnRead(pipeline)方法最终会调用channel.close()方法来完成tcp套接字的关闭(这点下面会详细说明)

如何正确的关闭一个已经注册的SelectableChannel了?

需要调用channel.close()

最终调用的会使AbstractInterruptibleChannel的close方法

总归来说,调用channel.close()方法:

① 能够调动channel对应的SelectionKey的cancel()方法使该SelectionKey加到Selector的cancel selectionKey set集合中,这样在下一次selector的时候,就会将其从selector中相关的selectionKey集合中移除,并且不会监听该selectionKey所感兴趣的事件了。
② 会关闭底层的套接字连接。
这里注意:如果只是通过调用SelectionKey.cancel()来注销一个远端已经关闭了的channel,是一个不对的方法。因为selector.select()在处理cancel selectionKey set(注销的SelectionKey集合)的时候,会判断若该SelectionKey对应的channel已经没有注册到其他的selector,并且该channel open表示为false的情况下,才会去调用底层套接字的关闭操作。所以如果之调用SelectionKey.cancel()来注销一个远端已经关闭了的channel,会导致本段的TCP连接处于“CLOSE_WAIT”状态,一直在等待程序调用套接字的关闭。

补充:channel的open标志,只有在下面两种情况下才会将open置为false。

a) 调用了channel.close()方法;

b) 或者操作channel读/写的当前线程发生中断时。

参考

  • 圣思园netty课程


相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部