简述 Reactor 模式和 I/O 多路复用,并用 Java 实现一个简单的 Reactor 模型~
要求
- 对网络 IO 模型有一定认识。
- 具有一定的 socket 编程基础。
1. 前言
对于支持高效处理大量并发连接的网络程序,I/O 多路复用技术和 Reactor 模式必不可少;例如 Redis、Netty 等,我们一起来学习学习~
2. I/O 多路复用
![img.png]()
如图所示, 操作系统将网络连接抽象为 FD 文件描述符
, 多路复用器select/poll/epll
能够同时监听多个 FD,当进行select()
系统调用的时候,多路复用器能够将存在事件的 FD 返回。
实现的效果:
- 只需要使用一个线程/进程不断的进行
select()
,就能够做到同时监控多个网络连接的读写事件。
- 因为不需要一个线程/进程处理一个连接请求,线程/进程的数量不再成为瓶颈
- 多路:多个网路连接;复用:复用同一个线程/进程
2.1 代码实现
我们使用 Java 的 NIO 来实现一个简单的 I/O 多路复用程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
public class IOMultiplexing {
public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(30000)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove();
if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { } } } } }
|
- 代码实例展示了服务端监听 30000 端口,等待 client 发起网络连接,即对应 OP_ACCEPT 事件;
- 监听到 OP_ACCEPT 事件之后,调用
accept()
获取代表为客户端连接的 socketChannel
,并且将其注册到 selector
;
- 然后进入下一轮
select()
; 这一次 select()
就会同时监听 serverSocketChannel
和 socketChannel
,此时可能产生 OP_READ
和 OP_ACCEPT
事件
可以看到,我们只需一个线程就能做到同时监听多个 FD 产生的事件,只需要一次 select()
系统调用就能同时处理多个 IO 事件,大大提高了 I/O 效率。
3. Reactor 模式
3.1 Reactor 是什么
维基百科上的定义是:
反应器模式(Reactor_pattern)是一种为处理服务请求并发 提交到一个或者多个服务处理程序的事件设计模式。当请求抵达后,服务处理程序使用解多路分配策略,然后同步地派发这些请求至相关的请求处理程序。
我的理解是:
Reactor 模式提供了一种优雅的方式来处理大量的并发 I/O 操作;将 Client 与 server 建立网络连接拆分成三个步骤:
- 监听 I/O 事件(监听事件)
- 根据 I/O 事件类型,将事件分发给不同的 Handler(分发事件)
- Handler 处理相应的 I/O 事件(处理事件)
Reactor 模式包含以下几个重要角色:
Reactor
反应堆,可以简单理解成事件产生的地方;并且针对 I/O 事件类型,进行分发,本质上是一个分发器
一般 Reactor 都选择 I/O 多路复用技术
,因为能够做到使用一个线程/进程就能够批量的监听多个 FD 产生的事件,非常高效~
Acceptor
专门处理 Accept 事件的地方;当 Reactor 监听到 Accept 事件的时候,就会将对应的 FD 丢给 Acceptor 进行连接处理;
Accept 处理的内容一般是:
调用 accept()
方法获取 client 连接,然后将连接注册到多路复用器,监听后续的读写事件
Handler
专门处理事件的地方;当 client 连接产生相应的读写事件之后,需要做的处理;
整体架构如下图所示
![img.png]()
3.2 单 Reactor 单线程模式
上图是一个最简单的单 Reactor 单线程模型,我们看看代码如何来实现
Reactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class TinyReactor implements Runnable {
private final Selector selector; private final int port;
public TinyReactor(int port) throws IOException { this.selector = Selector.open(); this.port = port; }
public void run() { try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new TinyAcceptor(selector, serverSocketChannel)); while (!Thread.interrupted()) { this.selector.select(); Set<SelectionKey> selectionKeys = this.selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); this.dispatch(selectionKey); iterator.remove(); } } } catch (Exception e) { e.printStackTrace(); } }
private void dispatch(SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()) { TinyAcceptor accept = (TinyAcceptor) selectionKey.attachment(); accept.doAccept(); } else if (selectionKey.isReadable()) { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.handle(SelectionKey.OP_READ); } else if (selectionKey.isWritable()) { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.handle(SelectionKey.OP_WRITE); } } }
|
Reactor 借助 I/O 多路复用技术实现高效监听多个 FD 就绪事件,并且根据事件类型就行分发
Accept
事件类型,则分发给 TinyAcceptor
Read/Write
事件类型,则分发给 EventHandler
Accept
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class TinyAcceptor {
private Selector selector; private ServerSocketChannel serverSocketChannel;
public TinyAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) { this.selector = selector; this.serverSocketChannel = serverSocketChannel; }
public void doAccept() throws IOException { SocketChannel socketChannel = this.serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, new EventHandler(socketChannel)); } }
|
TinyAcceptor
做了两个事情:
- 调用
accept()
获取 client 连接
- 将获取到的连接注册到多路复用器中,并且监听
Read/Write
事件
Handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class EventHandler {
private final SocketChannel socketChannel;
public EventHandler(SocketChannel socketChannel) { this.socketChannel = socketChannel; }
public void handle(int eventType) { if (eventType == SelectionKey.OP_READ) { new ReadEventHandler().handleEvent(this.socketChannel); } else if (eventType == SelectionKey.OP_WRITE) { new WriteEventHandler().handleEvent(this.socketChannel); } } }
|
这里实现的较为简易,主要想表达的意思是,handler 一般用来相应的读写事件
3.3 单 Reactor 多线程模式
上述是单 Reactor 单线程模式,从 Reactor 获取 I/O 事件,分发 I/O 事件,Accept 处理连接,到 Handler 处理读写请求都是使用同一个线程,如果 Handler 处理的逻辑是较为耗时的操作,则很容易拖垮整个程序的性能;我们很容易想到可以在 handler 引入线程池进行异步处理。
整体结构
![img.png]()
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class EventHandler { private final SocketChannel socketChannel;
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy() );
public EventHandler(SocketChannel socketChannel) { this.socketChannel = socketChannel; }
public void handle(int eventType) { if (eventType == SelectionKey.OP_READ) { threadPool.submit(new ReadEventHandler(socketChannel)); } else if (eventType == SelectionKey.OP_WRITE) { threadPool.submit(new WriteEventHandler(socketChannel)); } } }
|
可以看到,我们 handler 执行不再使用 I/O 线程,而是丢到一个线程池中执行,这就是 单 Reactor 多线程模型
3.4 多 Reactor 多线程模式
上述两种模式的区别在于 handler 执行是 I/O 线程还是提交到线程池中去执行;我们 Reactor 模型需要建立客户端连接、监听就绪事件 accept()
、select()
;当面对并发客户端连接时,显然 Reactor 就成为了瓶颈。
所以衍生出多 Reactor 多线程模式
整体结构
![img.png]()
Reactor 划分为
- 主 Reactor
- 仅仅监听处理 Accept 事件,然后将 Accept 事件分发给 Acceptor
- Acceptor 负责调用
accept()
获取 client 连接,并且将连接注册到一个 从 Reactor 中
- 从 Reactor
- 仅仅负责监听处理
Read/Write
事件,然后将事件提交到 Handler 线程池中进行执行
主 Reactor 的线程仅处理 Accept 事件,不再监听处理Read/Write
事件, 而是提交给 从 Reactor 来处理;这样主 Reactor 能够专注于处理客户端连接,应对并发连接场景也能够高效处理~
代码实现
MainReactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
public class MainReactor implements Runnable {
private final Selector selector; private final int port;
public MainReactor(int port) throws IOException { this.selector = Selector.open(); this.port = port; }
public void run() { try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new TinyAcceptor(selector, serverSocketChannel)); while (!Thread.interrupted()) { this.selector.select(); Set<SelectionKey> selectionKeys = this.selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); this.dispatch(selectionKey); iterator.remove(); } } } catch (Exception e) { e.printStackTrace(); } }
private void dispatch(SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()) { TinyAcceptor accept = (TinyAcceptor) selectionKey.attachment(); accept.doAccept(); } } }
|
可以看到 MainReactor 仅仅处理 Accept 事件
TinyReactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class TinyAcceptor {
public static final int DEFAULT_SUB_REACTOR_NUM = 4;
private Selector selector; private ServerSocketChannel serverSocketChannel; private final List<SubReactor> subReactorList = new ArrayList<>(DEFAULT_SUB_REACTOR_NUM);
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy() );
public TinyAcceptor(Selector selector, ServerSocketChannel serverSocketChannel) throws IOException { this.selector = selector; this.serverSocketChannel = serverSocketChannel;
for (int i = 0; i < DEFAULT_SUB_REACTOR_NUM; i++) { SubReactor subReactor = new SubReactor(); subReactorList.add(subReactor); threadPool.execute(subReactor); } }
public void doAccept() throws IOException { SocketChannel socketChannel = this.serverSocketChannel.accept(); socketChannel.configureBlocking(false); SubReactor subReactor = LoadBalance.getSubReactor(subReactorList); subReactor.getSelector().wakeup(); socketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE); } }
|
获取到 client 连接之后,通过负载均衡选择一个 从 Reactor,将 client 连接注册到 从 Reactor 上,并且仅关注读/写事件
SubReactor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class SubReactor implements Runnable {
private final Selector selector;
public SubReactor() throws IOException { this.selector = Selector.open(); }
public void run() { while (!Thread.interrupted()) { try { this.selector.select(); Set<SelectionKey> selectionKeys = this.selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); this.dispatch(selectionKey); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } }
private void dispatch(SelectionKey selectionKey) throws IOException { if (selectionKey.isReadable()) { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.handle(SelectionKey.OP_READ); } else if (selectionKey.isWritable()) { EventHandler eventHandler = (EventHandler) selectionKey.attachment(); eventHandler.handle(SelectionKey.OP_WRITE); } }
}
|
从 Reactor 的逻辑与 主 Reactor 基本相同,都是不断调用 select()
获取就绪事件,然后进行分发。
3.5 上述代码地址
https://github.com/OneCastle5280/tiny-reactor.git
3.4 总结
本篇文章简单介绍总结了 I/O 多路复用和 Reactor 模型,Redis 使用的是单 Reactor 模型,Netty 支持上述三种模型,我也是在学习 Redis、Netty 过程中,发现对这些核心基础概念理解的并不是很透彻,就花点时间学习总结~