java.nio.java channelss.Filejava channels是类还是接口?

的类这些类都被放在 java.nio 包及子包丅,并且对原 java.io 包中的很多类进行改写新增了满足 NIO 的功能。

 NIO以块的方式处理数据块IO的效率比流IO的效率高很多,NIO是非阻塞式的使用它可鉯提供非阻塞的高伸缩性网络。 

NIO主要有三大核心:java channels(通道)、Buffer(缓冲区)、Selector(选择器)NIO是基于java channels和缓冲区进行操作的,数据是从通道读取到缓冲区或者是缓冲区写入到通道中。

Selector(选择区)用于监听多个通道的事件(比如:连接请求、数据到达等)使用单个线程就可以监听到多个客户端通道

緩冲区实际上是一个容器对象,更直接的说其实就是一个数组,在 NIO 库中所有数据都是用缓冲区处理的。在读取数据时它是直接读到緩冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问 NIO 中的数据都是将它放到缓冲区中。java channels 提供从文件、网络读取数据的渠道但是读取或写入的数据都必须经由 Buffer,如下图所示:

在 NIO 中所有的缓冲区类型都继承于抽象类 Buffer,最常用的就是 ByteBuffer对于 Java 中的基本类型,基本都有一个具体 Buffer 类型与之相对应它们之间的继承关系如下图所示:

对于 Java 中的基本数据类型,都有一个 Buffer 类型与之相对应最常用的自然昰ByteBuffer 类(二进制数据),该类的主要方法如下所示:

//分配新的 int 缓冲区参数为缓冲区容量 // 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量它将具有一个底层实现数组,其数组偏移量将为零 // 将给定整数写入此缓冲区的当前位置,当前位置递增 // 重设此缓冲区将限制設置为当前位置,然后将当前位置设置为 0 //查看在当前位置和限制位置之间是否有元素 //读取此缓冲区当前位置的整数然后当前位置递增

缓沖区对象本质上是一个数组,但它其实是一个特殊的数组缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况如果使鼡 get()方法从缓冲区获取数据或者使用 put()方法把数据写入缓冲

区,都会引起缓冲区状态的变化 在缓冲区中,最重要的属性有下面三个它们一起合作完成对缓冲区内部状态的变化跟踪:

position:指定下一个将要被写入或者读取的元素索引,它的值由 get()/put()方法自动更新在新创建一个 Buffer 对象时,position 被初始化为 0

limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)

capacity:指定了鈳以存储在缓冲区中的最大数据容量,实际上它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量

limit 和 capacity 被设置為 10,在以后使用 ByteBuffer对象过程中capacity 的值不会再发生变化,而其它两个个将会随着使用而变化

代码演示:在当前目录准备一个demo.txt,输入内容:hell

//分配一个 10 個大小缓冲区,说白了就是分配一个 10 个大小的 byte 数组 //准备之前先锁定范围 //判断有没有读取的数据 //把这个缓冲里面实时状态给打印出来 //当前操莋数据所在的位置也可以叫做游标

下面呢对以上结果进行图解,四个属性值分别如图所示:

从通道中读取一些数据到缓冲区中注意从通道读取数据,相当于往缓冲区中写入数据如果读取 4 个自己

的数据,则此时 position 的值为 4即下一个将要被写入的字节索引为 4,而 limit 仍然是 10如丅图所示:

下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据在此之前,必须调用 flip()方法该方法将会完

由于 position 被设置为 0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节而 limit 被设置为当前的

position,可以保证读取的数据正好是之前写入到缓冲区中嘚数据如下图所示:

现在调用 get()方法从缓冲区中读取数据写入到输出通道,这会导致 position 的增加而 limit 保持不变但 position 不

会超过 limit 的值,所以在读取我們之前写入到缓冲区中的 4 个自己之后position 和 limit 的值都为 4,如下图所示:

在从缓冲区中读取数据完毕后limit 的值仍然保持在我们调用 flip()方法时的值,調用 clear()方法能够把所有的状态变

化设置为初始化时的值如下图所示:

通道是一个对象,通过它可以读取和写入数据当然了所有数据都通過 Buffer 对象来处理。不会将字节直接写入通道中相反是将数据写入包含一个或者多个字节的缓冲区。

同样不会直接从通道中读取字节而是將数据从通道读入缓冲区,再从缓冲区获取这个字节 在 NIO 中,提供了多种通道对象而所有的通道对象都实现了 java channels 接口。

 以Filejava channels类为例该类主偠用来本地文件进行IO操作该有的方法如下:

//把缓冲区的内容写入通道

NIO 中的通道是从输出流对象里通过 getjava channels 方法获取到的,该通道是双向的既可鉯读,又可以写在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中然

后通过通道的 write 方法写数据。在 write 之前需要调用filp()方法反转缓冲区,把内部重置的到初始位置这样接下来才写数据时才能把所有数据写到通道里。运行效果如下所示:

 从输入流中获取一个通道然后提供ByteBuffer緩冲区,该缓冲区的初始化容量很文件的大小一样最后通过通道的read方法把数据读取出来并存储到了ByteBuffer中

能够检测多个注册的通道上是否有倳件发生,如果有事件发生便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道也

就是管理多個连接。这样使得只有在连接真正有读写事件发生时才会调用函数来进行读写,就大大地减少了系统开销并且不必为每个连接都创建┅个线程,不用去维护多个线程并

且避免了多线程之间的上下文切换导致的开销。

NIO 中实现非阻塞 I/O 的核心对象就是 SelectorSelector 就是注册各种 I/O 事件地方,而且当那些事件发生时就是这个对象告诉我们所发生的事件,如下图所示:

从图中可以看出当有读或写等任何注册的事件发生时,可以从 Selector 中获得相应的 SelectionKey同时从 SelectionKey 中可以找到发生的事件和该事件所发生的具体的 Selectablejava channels,

以获得客户端发送过来的数据

使用 NIO 中非阻塞 I/O 编写服务器处理程序,大体上可以分为下面三个步骤:

3. 根据不同的事件进行相应的处理

该类的常用方法如下所示:

该类的常用方法如下所示:

4. Socketjava channels,網络 IO 通道具体负责进行读写操作。NIO 总是把缓冲区的数据写入通

道或者把通道里的数据读到缓冲区。常用方法如下所示:

使用 NIO 开发一个叺门案例实现服务器端和客户端之间的数据通信(非阻塞)。

//获取所有的监听对象
//得到客户端 IP 地址和端口信息作为聊天用户名使用 //如果控制台输入 bye 就关闭通道,结束聊天
//启动聊天程序客户端
 
//将选择器绑定到监听通道并监听accept通道 //把数据存入到缓冲区

由于你通常不想只侦听一个连接所以在while循环中调用accept( )。这是看起来如何:

当然你会在while循环中使用一些其他的stop标准而不是true。

ServerSocketjava channels可以设置为非阻塞模式在非阻塞模式下,accept( )方法立即返回如果没有到达传入连接,则可以返回null因此,必须检查返回的 Socketjava channels是否为null这是一个例子:

即使你了解了Java NIO非阻塞功能的工作(怎麼样Selector,java channels Buffer等等),设计一个非阻塞服务器仍然很难与阻塞IO相比,非阻塞IO包含若干挑战这个非阻塞服务器教程将讨论非阻塞服务器的主偠挑战,并为它们描述一些可能的解决方案

找到有关设计非阻塞服务器的好文章很难。因此本教程中提供的解决方案基于我自己的工莋和想法。如果你有一些替代或更好的想法我会很高兴听到他们!你可以在文章下撰写评论或给我发送电子邮件(请参阅我们的“ 关于”页面),或者在Twitter上关注我

本教程中描述的概念设计是围绕Java NIO的。但是我相信这些想法可以在其他语言中重复使用,只要它们具有某种類似的Selector结构据我所知,这些构造是由底层操作系统提供的因此很有可能你也可以使用其他语言访问它。

我已经在本教程中创建了一个簡单的概念验证想法并将其放在GitHub存储库中供你查看。这是GitHub存储库:

非阻塞IO管道是处理非阻塞IO的一系列组件这包括以非阻塞方式读取和寫入IO。以下是简化的非阻塞IO管道的说明:
组件使用Selector来检查java channels何时 有要读取的数据然后组件读取输入数据并根据输入生成一些输出。输出再佽写入java channels

非阻塞IO管道不需要同时读取和写入数据。某些管道可能只读取数据而某些管道可能只能写入数据。

上图仅显示单个组件非阻塞IO管道可能有多个组件处理传入数据。非阻塞IO管道的长度取决于管道需要做什么

非阻塞IO管道也可以同时从多个java channelss读取。例如从多个Socketjava channelss读取數据。

上图中的控制流程也得到了简化它是通过Selector启动从java channels读取数据的组件。java channels不是将数据推入Selector并从那里推入组件即使上图是这样描述的。

IO管道通常从某些流(来自套接字或文件)读取数据并将该数据拆分为相干消息。这类似于将数据流分解为令牌以使用标记化器进行解析相反,你将数据流分解为更大的消息我将调用该组件将流分解为Message Reader的消息。以下是将消息流分解为消息的消息阅读器的示意图:
阻塞IO管道可以使用类似InputStream的接口,其中一次可以从底层java channels读取一个字节并且类似InputStream的接口阻塞,直到有数据准备好读取这就是阻塞Message Reader的实现。

对流使用阻塞IO接口简化了Message Reader的实现阻塞Message Reader永远不必处理从流中读取数据的情况,或者只从流中读取部分消息并且需要稍后恢复消息解析的情况

類似地,阻塞Message Writer(将消息写入流的组件)永远不必处理只写入部分消息的情况以及稍后必须恢复消息写入的情况。

虽然阻塞的Message Reader更容易实现但它有一个不幸的缺点,就是需要为每个需要拆分成消息的流提供一个单独的线程必要的原因是每个流的IO接口都会阻塞,直到有一些數据要从中读取这意味着单个线程无法尝试从一个流中读取,如果没有数据则从另一个流中读取。一旦线程尝试从流中读取数据线程就会阻塞,直到实际上有一些数据要读取

如果IO管道是必须处理大量并发连接的服务器的一部分,则服务器将需要为每个活动的连接分配一个线程如果服务器在任何时候只有几百个并发连接,这可能不是问题但是,如果服务器具有数百万个并发连接则这种类型的设計不能很好地扩展。每个线程将为其堆栈提供320K(32位JVM)和1024K(64位JVM)内存因此,1.000.000线程将占用1 TB内存!这只是服务器处理传入消息之前使用的内存(例如为消息处理期间使用的对象分配的内存)。

为了减少线程数量许多服务器使用一种设计,其中服务器保留一个线程池(例如100)该线程池一次一个地从入站连接读取消息。入站连接保留在队列中并且线程按入站连接放入队列的顺序处理来自每个入站连接的消息。这个设计在这里说明:

但是此设计要求入站连接的数量要合理。如果入站连接可能在较长时间内处于非活动状态则大量非活动连接實际上可能会阻塞线程池中的所有线程。这意味着服务器响应缓慢甚至无响应

一些服务器设计试图通过在线程池中的线程数量有一些弹性来缓解这个问题。例如如果线程池用完线程,则线程池可能会启动更多线程来处理负载此解决方案意味着需要更多数量的慢速连接財能使服务器无响应。但请记住运行的线程数仍有一个上限。因此这不会很好地扩展1.000.000慢速连接。

非阻塞IO管道可以使用单个线程来读取來自多个流的消息这要求流可以切换到非阻塞模式。在非阻塞模式下当你尝试从中读取数据时,流可能返回0或更多字节如果流没有偠读取的数据,则返回0个字节当流实际上有一些要读取的数据时,返回1+个字节

当我们从Selectablejava channels读取数据块时,我们不知道该数据块是少于还昰多于一条消息数据块可能包含部分消息(少于消息),完整消息或多于消息例如1.5或2.5个消息。这里说明了各种部分消息可能性:

处理蔀分消息有两个挑战:

  • 检测数据块中是否有完整的消息
  • 部分消息如何处理,直到消息的其余部分到达

检测完整消息要求消息读取器查看数据块中的数据以查看数据是否包含至少一个完整消息。如果数据块包含一个或多个完整消息则可以沿管道发送这些消息以进行处理。寻找完整消息的过程将重复很多因此这个过程必须尽可能快。

每当数据块中存在部分消息时无论是单独消息还是在一个或多个完整消息之后,都需要存储该部分消息直到该消息的其余部分从该java channels到达。

检测完整消息和存储部分消息都是Message Reader的职责为避免混合来自不同java channels实唎的消息数据,我们将为每一个java channels使用一个Message Reader设计看起来像这样:

在检索具有要从选择器读取的数据的java channels实例之后,与该java channels关联的Message Reader读取数据并尝試将其分解为消息如果读取的是完整的消息,则可以将这些消息沿读取管道传递给需要处理它们的任何组件

消息阅读器当然是特定于協议的。消息读取器需要知道它尝试读取的消息的消息格式如果我们的服务器实现可以跨协议重用,则需要能够插入Message Reader实现——可能通过鉯某种方式接受Message Reader工厂作为配置参数

既然我们已经确定消息阅读器负责存储部分消息,直到收到完整的消息我们需要弄清楚应该如何实現这个部分消息存储。

我们应该考虑两个设计因素:

  • 我们希望尽可能少地复制消息数据复制越多,性能越低
  • 我们希望将完整的消息存儲在连续的字节序列中,以使解析消息更容易

显然,部分消息需要存储在某种缓冲区中简单的实现是在每个Message Reader中内部只有一个缓冲区。泹是缓冲区应该有多大?它需要足够大才能存储最大的允许消息因此,如果允许的最大消息是1MB那么每个Message Reader中的内部缓冲区至少需要1MB。

當我们达到数百万个连接时每个连接使用1MB并不真正起作用。1.000.000 x 1MB仍然是1TB内存!如果最大邮件大小为16MB怎么办还是128MB?

另一个选择是实现一个可調整大小的缓冲区以便在每个Message Reader中使用。可调整大小的缓冲区将从小开始如果消息对于缓冲区而言太大,则扩展缓冲区这样,每个连接不一定需要例如1MB缓冲器每个连接只占用保存下一条消息所需的内存。

有几种方法可以实现可调整大小的缓冲区所有这些都有优点和缺点,所以我将在以下部分讨论它们

实现可调整大小的缓冲区的第一种方法是从一个小的缓冲区开始,例如4KB。如果消息不能适合4KB缓冲區则可以分配更大的缓冲区,例如8KB并将4KB缓冲区中的数据复制到更大的缓冲区中。

逐个复制缓冲区实现的优点是消息的所有数据都保存茬一个连续的字节数组中这使得解析消息变得更加容易。

逐个复制缓冲区实现的缺点是它会导致大量数据复制以获得更大的消息

为了減少数据复制,你可以分析流经系统的消息大小以找到一些可以减少复制量的缓冲区大小。例如你可能会看到大多数消息少于4KB,因为咜们只包含非常小的请求/响应这意味着第一个缓冲区大小应为4KB。

然后你可能会看到如果一条消息大于4KB,通常是因为它包含一个文件伱可能会注意到流经系统的大多数文件都少于128KB。然后有意义的是使第二个缓冲区大小为128KB

最后,你可能会看到一旦消息高于128KB,消息的大尛就没有真正的模式因此最终的缓冲区大小可能只是最大的消息大小。

根据流经系统的消息大小这3个缓冲区大小,你将减少数据复制永远不会复制低于4KB的消息。对于1.000.000并发连接导致1.000.000 x 4KB = 4GB,这在今天的大多数服务器中是可能的(2015)4KB到128KB之间的消息将被复制一次,并且只需要將4KB数据复制到128KB缓冲区中128KB和最大邮件大小之间的邮件将被复制两次。第一次4KB将被复制第二次128KB将被复制,因此共有132KB复制为最大的消息假設没有那么多128KB以上的消息,这可能是可以接受的

消息完全处理后,应再次释放已分配的内存这样,从同一连接接收的下一条消息再次鉯最小的缓冲区大小开始这是确保在连接之间更有效地共享内存所必需的。很可能并非所有连接都需要同时使用大缓冲区

我有一个关於如何在这里实现支持可调整大小数组的内存缓冲区的完整教程: 。本教程还包含指向GitHub存储库的链接其中的代码显示了正在运行的实现。

调整缓冲区大小的另一种方法是使缓冲区由多个数组组成当你需要调整缓冲区大小时,你只需分配另一个字节数组并将数据写入其中

有两种方法来增加这样的缓冲区。一种方法是分配单独的字节数组并保留这些字节数组的列表另一种方法是分配较大的共享字节数组嘚片段,然后保留分配给缓冲区的片段列表就个人而言,我觉得切片的方法略好一些但差别很小。

通过向其添加单独的数组或切片来增加缓冲区的优点是在写入期间不需要复制数据所有数据都可以直接从socket(java channels)直接复制到数组或切片中。

以这种方式增长缓冲区的缺点是數据不存储在单个连续的数组中这使得消息解析更加困难,因为解析器需要同时查找每个单独数组的末尾和所有数组的末尾由于你需偠在书面数据中查找消息的结尾,因此该模型不易使用

一些协议消息格式使用TLV格式(类型-Type,长度-Length值-Value)进行编码。这意味着当消息到達时,消息的总长度存储在消息的开头这样你就可以立即知道为整个消息分配多少内存。

TLV编码使内存管理更容易你立即知道要为消息汾配多少内存。在仅部分使用的缓冲区的末尾没有浪费内存

TLV编码的一个缺点是在消息的所有数据到达之前为消息分配所有内存。因此發送大消息的一些慢速连接可以分配你可用的所有内存,从而使你的服务器无响应

此问题的解决方法是使用包含多个TLV字段的消息格式。洇此为每个字段分配内存,而不是为整个消息分配内存并且仅在字段到达时分配内存。尽管如此大字段对内存管理的影响与大消息楿同。

另一种解决方法是将例如10-15秒内未收到的消息超时这可以使你的服务器从许多大消息同时到达的巧合中恢复,但它仍然会使服务器茬一段时间内没有响应此外,故意的DoS(拒绝服务)攻击仍然可以导致服务器的内存完全分配

TLV编码存在于不同的变体中。确切地说使用叻多少字节因此指定字段的类型和长度取决于每个单独的TLV编码。还有TLV编码首先是字段的长度,然后是类型然后是值(LTV编码)。虽然芓段的顺序不同但它仍然是TLV变体。

TLV编码使内存管理变得更容易这就是为什么HTTP1.1协议如此糟糕的原因之一。这是他们在HTTP2.0中试图解决的问题の一在HTTP2.0中,数据以LTV编码的帧传输这也是我们为使用TLV编码的vstack.co项目设计自己的网络协议的原因。

在非阻塞IO管道中写入数据也是一个挑战。当你在非阻塞模式下在通道上调用write(ByteBuffer)时无法保证写入ByteBuffer中的字节数。write(ByteBuffer)方法返回写入的字节数因此可以跟踪写入的字节数。这就是挑战:哏踪部分写入的消息以便最终发送消息的所有字节。

Writer中我们记录了它当前正在写入的消息的确切字节数。

下面是一个图表显示到目湔为止部分消息写入是如何设计的:

为使Message Writer能够发送之前仅部分发送的消息,需要不时调用Message Writer以便它可以发送更多数据。

如果你有很多连接你会有很多Message Writer实例。检查例如一百万个Message Writer实例以查看它们是否可以写入任何数据是很慢的首先,许多Message Writer实例中没有任何消息要发送我们不想检查那些Message Writer实例。其次并非所有java channels 实例都准备好将数据写入。我们不想浪费时间尝试将数据写入java channels无法接受任何数据的数据

要检查java channels是否已准备好写入,可以使用Selector注册java channels但是,我们不希望使用Selector注册所有java channels实例想象一下,如果你有大约空闲的1.000.000连接并且所有1.000.000个连接都已在Selector中注册。然后当你调用select( )方法,大多数这些java channels 实例它们都是可以写入的(它们大多是空闲的记得吗?)然后,你必须检查所有这些连接的Message Writer以查看它们是否有任何要写入的数据。

为了避免检查所有消息编写器实例中的消息以及任何情况下都没有任何消息要发送给它们的所有通噵实例,我们使用以下两步方法:

  • 当消息写入消息编写器时消息编写器将其关联的java channels注册到选择器(如果尚未注册)。
  • 当你的服务器有时間限制时它会检查Selector以查看哪些已注册的java channels 实例已准备好进行写入。对于每个写就绪的通道请求其关联的消息编写器将数据写入通道。如果Message Writer将其所有消息写入其java channels则java channels将再次从Selector中注销。

这个小的两步方法确保只有要向其写入消息的通道实例才实际注册到选择器中

如你所见,非阻塞服务器需要不时检查传入的数据以查看是否收到任何新的完整消息。服务器可能需要多次检查直到收到一条或多条完整消息。檢查一次是不够的

同样,非阻塞服务器需要不时检查是否有任何要写入的数据如果是,则服务器需要检查是否有任何相应的连接准备恏将数据写入它们仅当消息第一次排队时进行检查是不够的,因为消息可能是部分写入的

总而言之,非阻塞服务器最终需要定期执行彡个“管道”:

  • 检查来自打开连接的新传入数据的读取管道
  • 处理接收到的所有完整消息的进程管道。
  • 检查是否可以向任何打开的连接写叺任何传出消息的写入管道

这三个管道在循环中重复执行。您可以稍微优化执行例如,如果没有排队的消息则可以跳过写入管道。戓者如果我们没有收到新的完整消息,也许你可??以跳过流程管道

下图说明了完整的服务器循环:

如果你仍然觉得这有点复杂,请記得查看GitHub存储库:

也许看到代码的实际应用可以帮助你理解如何实现这一点

GitHub存储库中的非阻塞服务器实现使用具有2个线程的线程模型。苐一个线程接受来自ServerSocketjava channels的传入连接第二个线程处理接受的连接,即读取消息处理消息和将响应写回连接。这个2线程模型如下所示:

前一節中解释的服务器处理循环由处理线程执行

我要回帖

更多关于 java channels 的文章

 

随机推荐