java nio

前言

Java IO应用于网络通讯、微服务调用框架、消息中间件等各个领域。主要有三种形式:BIO(Blocking IO)、NIO(NonBlocking IO)、AIO(Asynchronized IO)。我们主要介绍下Java NIO与传统IO的不同以及NIO的底层原理。

传统IO

传统的IO包为java.io包,是面向流操作的,并且在IO操作时是同步阻塞的。传统的IO主要包含一下几种类型:

  • 字节流(InputStream、OutputStream)操作
  • 字符流(Writer、Reader)操作
  • 基于磁盘操作的IO(FileInputStream、FileOutputStream)
  • 基于网络的操作(java.net包下的Socket)

其中字节流操作的相关类是基于装饰器模式实现的,字节流操作相关类的关系如图1所示:

1566124162591

图1 io装饰者模式架构

如图1所示,最上层InputStream为抽象组件,具体组件类包含File、StringBuffer、ByteArray三个扩展类。FilterInputStream为装饰器抽象类,第三层的类为具体的装饰器扩展类,增加具体的装饰行为。

Stream的用法如下代码所示,首先通过File输入或输出字节文件,然后进行读写操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try {
OutputStream outputStream = new FileOutputStream("E:\\Githup\\java-back-end\\io-file\\iotest.txt");
String string = "Mrs Siri\n";
outputStream.write(string.getBytes());
outputStream.write(string.getBytes());
outputStream.close();

InputStream inputStream = new FileInputStream("E:\\Githup\\java-back-end\\io-file\\iotest.txt");
byte[] bytes = new byte[inputStream.available()];
inputStream.read(bytes);
String str = new String(bytes, "utf-8");
System.out.println(str);
inputStream.close();
}catch (Exception e){
System.out.println(e.getMessage());
}

writer、reader的用法如下所示:

1
2
3
4
5
6
7
8
9
10
11
Writer writer = new FileWriter("E:\\Githup\\java-back-end\\io-file\\iotest2.txt", true);
writer.append("hello");
writer.flush();
writer.close();

Reader reader = new FileReader("E:\\Githup\\java-back-end\\io-file\\iotest2.txt");
BufferedReader bufferedReader = new BufferedReader(reader);
String s = null;
while ((s = bufferedReader.readLine()) !=null){
System.out.println(s);
}

NIO

单线程socket连接

网络IO中,传统的socket编码是阻塞同步的。最原始的网络编程思路是while循环不断监听socket,然后调用handler处理。

1
2
3
4
while(true){
socket = accept();
handler(socket);
}

多线程socket连接

单线程处理效率很低,当前handler没有处理完成,后面请求只能被阻塞。优化方案是使用多线程,即每个连接独占一条线程,socket服务端代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws Exception {
new Thread(new ServerSocketDemo()).start();
}

public class ServerSocketDemo implements Runnable {

@Override
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(8081);
while (!Thread.interrupted()) {
// accept阻塞获取socket连接
new Thread(new Handler(serverSocket.accept())).start();
}
//client.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
// peocess socket
}
}
}

这种方式每建立一个socket连接接分配一个Handler线程去处理socket连接,连接模型如图2所示。这种方式相比于单线程来说提高了吞吐量,并且不同的socket之间互不影响。缺点是严重依赖CPU线程数,CPU线程数少,连接过多时系统将无法承受;并且线程的创建销毁比较耗时间。

1566312901214

图2 socket多线程模型

reactor模式

对于多线程socket连接方式有一种比较好的改进方法:利用事件驱动的思想,只有当有时间触发时才调用线程去调用回调函数处理相应的事件。这样一个线程就可以处理多个socket连接事件。reactor模式的模型如图3所示:(参考连接)

图3 reactor模式(工作单线程)原理图

对应于nio中的几个变量含义:

  • Channels 通道,表示文件的连接,支持非阻塞读,例如socket连接
  • Buffers 缓存区,nio是面向缓存区的,所有的操作都需要通过缓存。缓存能够直接对Channels进行读写操作
  • Selectors 选择器,用来监听哪些Channels通道上面有IO事件
  • SelectionKeys 选择器key,维护了IO事件状态以及相关的绑定关系(channel、selector、attachment)

结合图3结构,我们查看反应堆reactor的创建过程如下代码所示,首想将ServerSocketChannel注册到selector选择器上,并将感兴趣事件设置为accept类型,即新建连接事件。注册之后返回一个SelectionKey对象用于存放通道和选择器之间的关系以及感兴趣的事件类型,并将回调对象attachment添加到selectionKey中。在run函数中,获取感兴趣事件,并通过分发器调用selectionkey的回调对象attachment。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
// 打开选择器监听IO时间
selector = Selector.open();
// 开启socketchannel,并绑相应端口号
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// 设为非阻塞
serverSocket.configureBlocking(false);
// 注册serversocket(通道)到selector器上面
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 将Acceptor作为sk的回调对象
sk.attach(new Acceptor());
}

// class Reactor continued
public void run() {// normally in a new Thread
try {
while (!Thread.interrupted()) {
// 阻塞获取准备就绪的事件
selector.select();
// 选出有事件触发的selectionKeys
Set selected = selector.selectedKeys();
// 利用迭代器循环遍历sk,并利用dispach分发器调用sk的attachment对象
Iterator it = selected.iterator();
while (it.hasNext()) {
dispatch((SelectionKey)it.next());
}
}
}catch (IOException ex) { /* ... */ }

}

// 分发器,调用sk的attachment对象
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
// 回调对象(获取新的socket连接)
class Acceptor implements Runnable { // inner
public void run() {
try {
// 获取socket连接
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
}

创建完reactor之后,会获取到新的socket连接。之后调用handler对象处理socketchannel和selector,Handler代码如下所示,也是首先将通道注册到选择器,设置感兴趣的事件类型,添加回调对象等操作。在selector中通过dispatch函数调用对应的回调对象进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// handler回调函数处理channel
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
// 将soscket(通道)注册到选择器上面
sk = socket.register(sel, 0);
// 将本类添加为回调类
sk.attach(this);
// 注册感兴趣的类型:读事件
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }

// 先读后写
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}

}

reactor模式改进

上面是单线程reactor模式已经实现,想进一步优化可在handler中引入线程池。利用线程池来处理send、read、decode、compute、encode等操作。结构图为:

图4 reactor工作多线程模式架构

Handler代码实例如下所示,在Handler中每次调用read函数时通过线程池执行其中的Processer函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Handler implements Runnable {
// 利用线程池
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
// 线程池异步调用处理函数
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}

以上就是nio的精髓reactor模式,不再是基于一个线程一个连接,而是一个线程管理多个socket连接,通过轮询的方式监听通道的事件,当有事件发生时会触发分发器新建线程调用回调对象。也就是说只有socket连接有任务时,才会新建线程处理,没有任务时不会新建线程处理,这大大提高了CPU资源的利用率,解决了硬件限制导致连接数不能过多的问题。

netty源码分析

在netty中,NioEventLoop类相当于reactor模式中的reactor类,里面有selector、selectedKeys。利用register函数将channel注册到unwrappedSelector上面。register代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 注册函数 ch为channel, interestOps是感兴趣的事件类型, task相当于attachment,是dispatch所调用的回调函数
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {

……

if (inEventLoop()) {
// 注册函数
register0(ch, interestOps, task);
} else {
try {
// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
// may block for a long time while trying to obtain an internal lock that may be hold while selecting.
submit(new Runnable() {
@Override
public void run() {
register0(ch, interestOps, task);
}
}).sync();
} catch (InterruptedException ignore) {
// Even if interrupted we did schedule it so just mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
}

// 注册函数
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
// 调用channel的注册函数
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}

接下来看下在channel中如何注册事件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException {
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
// 查找key是否存在,已存在证明通道已注册到selector上面,不需要重复注册
SelectionKey k = findKey(sel);
if (k != null) {
// 若已注册,则只需设置感兴趣事件类型和回调函数
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
// 将channel注册到ops上面去,并将生成绑定关系key
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}