IO 多路复用与 Reactor 模式

简述 Reactor 模式和 I/O 多路复用,并用 Java 实现一个简单的 Reactor 模型~

要求

  1. 对网络 IO 模型有一定认识。
  2. 具有一定的 socket 编程基础。

1. 前言

对于支持高效处理大量并发连接的网络程序,I/O 多路复用技术和 Reactor 模式必不可少;例如 Redis、Netty 等,我们一起来学习学习~

2. I/O 多路复用

img.png
如图所示, 操作系统将网络连接抽象为 FD 文件描述符, 多路复用器select/poll/epll能够同时监听多个 FD,当进行select()系统调用的时候,多路复用器能够将存在事件的 FD 返回。

实现的效果:

  1. 只需要使用一个线程/进程不断的进行select(),就能够做到同时监控多个网络连接的读写事件。
  2. 因为不需要一个线程/进程处理一个连接请求,线程/进程的数量不再成为瓶颈
  3. 多路:多个网路连接;复用:复用同一个线程/进程

2.1 代码实现

我们使用 Java 的 NIO 来实现一个简单的 I/O 多路复用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @author wangjiabao
*/
public class IOMultiplexing {

public static void main(String[] args) throws IOException {
// 多路复用器,select/poll/epoll
Selector selector = Selector.open();
// 打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 非阻塞模式
serverSocketChannel.configureBlocking(false);
// 监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(30000));
// 注册到选择器上,监听接受事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 事件循环
while (!Thread.interrupted()) {
// 阻塞直到至少有一个通道产生 I/O 事件
selector.select();
// 获取存在就绪事件的 FD
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 避免重复处理

if (key.isAcceptable()) {
// 处理客户端连接事件
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 将 client 连接注册到 selector 中,统一监听,监听可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 处理可读事件
}
}
}
}
}
  1. 代码实例展示了服务端监听 30000 端口,等待 client 发起网络连接,即对应 OP_ACCEPT 事件;
  2. 监听到 OP_ACCEPT 事件之后,调用 accept() 获取代表为客户端连接的 socketChannel ,并且将其注册到 selector;
  3. 然后进入下一轮 select(); 这一次 select() 就会同时监听 serverSocketChannelsocketChannel,此时可能产生 OP_READOP_ACCEPT 事件

可以看到,我们只需一个线程就能做到同时监听多个 FD 产生的事件,只需要一次 select() 系统调用就能同时处理多个 IO 事件,大大提高了 I/O 效率。

3. Reactor 模式

3.1 Reactor 是什么

维基百科上的定义是:

反应器模式(Reactor_pattern)是一种为处理服务请求并发 提交到一个或者多个服务处理程序的事件设计模式。当请求抵达后,服务处理程序使用解多路分配策略,然后同步地派发这些请求至相关的请求处理程序。

我的理解是:

Reactor 模式提供了一种优雅的方式来处理大量的并发 I/O 操作;将 Client 与 server 建立网络连接拆分成三个步骤:

  1. 监听 I/O 事件(监听事件)
  2. 根据 I/O 事件类型,将事件分发给不同的 Handler(分发事件)
  3. Handler 处理相应的 I/O 事件(处理事件)

Reactor 模式包含以下几个重要角色:

Reactor

反应堆,可以简单理解成事件产生的地方;并且针对 I/O 事件类型,进行分发,本质上是一个分发器

一般 Reactor 都选择 I/O 多路复用技术,因为能够做到使用一个线程/进程就能够批量的监听多个 FD 产生的事件,非常高效~

Acceptor

专门处理 Accept 事件的地方;当 Reactor 监听到 Accept 事件的时候,就会将对应的 FD 丢给 Acceptor 进行连接处理;

Accept 处理的内容一般是:
调用 accept() 方法获取 client 连接,然后将连接注册到多路复用器,监听后续的读写事件

Handler

专门处理事件的地方;当 client 连接产生相应的读写事件之后,需要做的处理;

整体架构如下图所示
img.png

3.2 单 Reactor 单线程模式

上图是一个最简单的单 Reactor 单线程模型,我们看看代码如何来实现

Reactor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class TinyReactor implements Runnable {

private final Selector selector;
private final int port;

public TinyReactor(int port) throws IOException {
this.selector = Selector.open();
this.port = port;
}

public void run() {
try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 注册到多路复用器中,att 是一个 Accept
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new TinyAcceptor(selector, serverSocketChannel));
while (!Thread.interrupted()) {
// 阻塞直到至少有一个通道产生 I/O 事件
this.selector.select();
// 获取存在就绪事件的 FD
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 分发
this.dispatch(selectionKey);
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据不同的 IO 事件类型进行分发处理
*
* @param selectionKey
* @throws IOException
*/
private void dispatch(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) {
// 连接事件则分发给 Acceptor
TinyAcceptor accept = (TinyAcceptor) selectionKey.attachment();
accept.doAccept();
} else if (selectionKey.isReadable()) {
// 可读事件则分发给 handler
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.handle(SelectionKey.OP_READ);
} else if (selectionKey.isWritable()) {
// 可写事件则分发给 handler
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.handle(SelectionKey.OP_WRITE);
}
}
}

Reactor 借助 I/O 多路复用技术实现高效监听多个 FD 就绪事件,并且根据事件类型就行分发

  1. Accept 事件类型,则分发给 TinyAcceptor
  2. Read/Write 事件类型,则分发给 EventHandler

Accept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TinyAcceptor {

private Selector selector;
private ServerSocketChannel serverSocketChannel;

public TinyAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}

public void doAccept() throws IOException {
// 获取到客户端连接
SocketChannel socketChannel = this.serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 注册到 selector 中, 监听读、写事件
socketChannel.register(selector, SelectionKey.OP_READ, new EventHandler(socketChannel));
}
}

TinyAcceptor 做了两个事情:

  1. 调用 accept() 获取 client 连接
  2. 将获取到的连接注册到多路复用器中,并且监听Read/Write 事件

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class EventHandler {

private final SocketChannel socketChannel;

public EventHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

public void handle(int eventType) {
if (eventType == SelectionKey.OP_READ) {
new ReadEventHandler().handleEvent(this.socketChannel);
} else if (eventType == SelectionKey.OP_WRITE) {
new WriteEventHandler().handleEvent(this.socketChannel);
}
}
}

这里实现的较为简易,主要想表达的意思是,handler 一般用来相应的读写事件

3.3 单 Reactor 多线程模式

上述是单 Reactor 单线程模式,从 Reactor 获取 I/O 事件,分发 I/O 事件,Accept 处理连接,到 Handler 处理读写请求都是使用同一个线程,如果 Handler 处理的逻辑是较为耗时的操作,则很容易拖垮整个程序的性能;我们很容易想到可以在 handler 引入线程池进行异步处理。

整体结构

img.png

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class EventHandler {
private final SocketChannel socketChannel;

// 核心线程、最大 10 线程的线程池
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2, // 核心线程数
10, // 最大线程数
60L, TimeUnit.SECONDS, // 当超过核心线程闲置时的存活时间
new LinkedBlockingQueue<Runnable>(), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

public EventHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}

public void handle(int eventType) {
if (eventType == SelectionKey.OP_READ) {
// 提交读事件处理器到线程池
threadPool.submit(new ReadEventHandler(socketChannel));
} else if (eventType == SelectionKey.OP_WRITE) {
// 提交写事件处理器到线程池
threadPool.submit(new WriteEventHandler(socketChannel));
}
}
}

可以看到,我们 handler 执行不再使用 I/O 线程,而是丢到一个线程池中执行,这就是 单 Reactor 多线程模型

3.4 多 Reactor 多线程模式

上述两种模式的区别在于 handler 执行是 I/O 线程还是提交到线程池中去执行;我们 Reactor 模型需要建立客户端连接、监听就绪事件 accept()select();当面对并发客户端连接时,显然 Reactor 就成为了瓶颈。
所以衍生出多 Reactor 多线程模式

整体结构

img.png

Reactor 划分为

  1. 主 Reactor
    • 仅仅监听处理 Accept 事件,然后将 Accept 事件分发给 Acceptor
    • Acceptor 负责调用accept()获取 client 连接,并且将连接注册到一个 从 Reactor 中
  2. 从 Reactor
    • 仅仅负责监听处理 Read/Write 事件,然后将事件提交到 Handler 线程池中进行执行

主 Reactor 的线程仅处理 Accept 事件,不再监听处理Read/Write 事件, 而是提交给 从 Reactor 来处理;这样主 Reactor 能够专注于处理客户端连接,应对并发连接场景也能够高效处理~

代码实现

MainReactor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 主 Reactor
*
* @author wangjiabao
*/
public class MainReactor implements Runnable {

private final Selector selector;
private final int port;

public MainReactor(int port) throws IOException {
this.selector = Selector.open();
this.port = port;
}

public void run() {
try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
// 非阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 注册到多路复用器中,att 是一个 Accept
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new TinyAcceptor(selector, serverSocketChannel));
while (!Thread.interrupted()) {
// 阻塞直到至少有一个通道产生 I/O 事件
this.selector.select();
// 获取存在就绪事件的 FD
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 分发
this.dispatch(selectionKey);
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据不同的 IO 事件类型进行分发处理
*
* @param selectionKey
* @throws IOException
*/
private void dispatch(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) {
// 连接事件则分发给 Acceptor
TinyAcceptor accept = (TinyAcceptor) selectionKey.attachment();
accept.doAccept();
}
}
}

可以看到 MainReactor 仅仅处理 Accept 事件

TinyReactor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TinyAcceptor {

public static final int DEFAULT_SUB_REACTOR_NUM = 4;

private Selector selector;
private ServerSocketChannel serverSocketChannel;
// 从 Reactor
private final List<SubReactor> subReactorList = new ArrayList<>(DEFAULT_SUB_REACTOR_NUM);

// 创建一个具有 4 核心线程、最大 10 线程的线程池
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4, // 核心线程数
10, // 最大线程数
60L, TimeUnit.SECONDS, // 当超过核心线程闲置时的存活时间
new LinkedBlockingQueue<Runnable>(), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

public TinyAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) throws IOException {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;

// init subReactor
for (int i = 0; i < DEFAULT_SUB_REACTOR_NUM; i++) {
SubReactor subReactor = new SubReactor();
subReactorList.add(subReactor);
// 加入到线程池中启动
threadPool.execute(subReactor);
}
}

public void doAccept() throws IOException {
// 获取到客户端连接
SocketChannel socketChannel = this.serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 选择一个 从 Reactor
SubReactor subReactor = LoadBalance.getSubReactor(subReactorList);
// 唤醒 select()
subReactor.getSelector().wakeup();
// 注册读写事件
socketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}

获取到 client 连接之后,通过负载均衡选择一个 从 Reactor,将 client 连接注册到 从 Reactor 上,并且仅关注读/写事件

SubReactor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class SubReactor implements Runnable {

private final Selector selector;

public SubReactor() throws IOException {
this.selector = Selector.open();
}

public void run() {
while (!Thread.interrupted()) {
try {
this.selector.select();
Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 分发
this.dispatch(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 根据不同的 IO 事件类型进行分发处理
*
* @param selectionKey
* @throws IOException
*/
private void dispatch(SelectionKey selectionKey) throws IOException {
// 从 Reactor 只有读写事件
if (selectionKey.isReadable()) {
// 可读事件
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.handle(SelectionKey.OP_READ);
} else if (selectionKey.isWritable()) {
// 可写事件
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.handle(SelectionKey.OP_WRITE);
}
}

}

从 Reactor 的逻辑与 主 Reactor 基本相同,都是不断调用 select() 获取就绪事件,然后进行分发。

3.5 上述代码地址

https://github.com/OneCastle5280/tiny-reactor.git

3.4 总结

本篇文章简单介绍总结了 I/O 多路复用和 Reactor 模型,Redis 使用的是单 Reactor 模型,Netty 支持上述三种模型,我也是在学习 Redis、Netty 过程中,发现对这些核心基础概念理解的并不是很透彻,就花点时间学习总结~