「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」
上一篇文章,分析了Netty服务端启动的初始化过程,今天我们来分析一下Netty中的Reactor线程模型
在分析源码之前,我们先分析,哪些地方用到了EventLoop?
- NioServerSocketChannel的连接监听注册
- NioSocketChannel的IO事件注册
NioServerSocketChannel连接监听
在AbstractBootstrap类的initAndRegister()方法中,当NioServerSocketChannel初始化完成后,会调用case
标记位置的代码进行注册。
1 | java复制代码final ChannelFuture initAndRegister() { |
AbstractNioChannel.doRegister
按照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()
方法中。
1 | java复制代码@Override |
NioEventLoop的启动过程
NioEventLoop是一个线程,它的启动过程如下。
在AbstractBootstrap的doBind0方法中,获取了NioServerSocketChannel中的NioEventLoop,然后使用它来执行绑定端口的任务。
1 | java复制代码private static void doBind0( |
SingleThreadEventExecutor.execute
然后一路执行到SingleThreadEventExecutor.execute方法中,调用startThread()
方法启动线程。
1 | java复制代码private void execute(Runnable task, boolean immediate) { |
startThread
1 | java复制代码private void startThread() { |
接着调用doStartThread()方法,通过executor.execute
执行一个任务,在该任务中启动了NioEventLoop线程
1 | java复制代码private void doStartThread() { |
NioEventLoop的轮询过程
当NioEventLoop线程被启动后,就直接进入到NioEventLoop的run方法中。
1 | java复制代码protected void run() { |
NioEventLoop的执行流程
NioEventLoop中的run方法是一个无限循环的线程,在该循环中主要做三件事情,如图9-1所示。
图9-1
- 轮询处理I/O事件(select),轮询Selector选择器中已经注册的所有Channel的I/O就绪事件
- 处理I/O事件,如果存在已经就绪的Channel的I/O事件,则调用
processSelectedKeys
进行处理 - 处理异步任务(runAllTasks),Reactor线程有一个非常重要的职责,就是处理任务队列中的非I/O任务,Netty提供了ioRadio参数用来调整I/O时间和任务处理的时间比例。
轮询I/O就绪事件
我们先来看I/O时间相关的代码片段:
- 通过
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
获取当前的执行策略 - 根据不同的策略,用来控制每次轮询时的执行策略。
1 | java复制代码protected void run() { |
selectStrategy处理逻辑
1 | java复制代码@Override |
如果hasTasks
为true,表示当前NioEventLoop线程存在异步任务的情况下,则调用selectSupplier.get()
,否则直接返回SELECT
。
其中selectSupplier.get()
的定义如下:
1 | java复制代码private final IntSupplier selectNowSupplier = new IntSupplier() { |
该方法中调用的是selectNow()
方法,这个方法是Selector选择器中的提供的非阻塞方法,执行后会立刻返回。
- 如果当前已经有就绪的Channel,则会返回对应就绪Channel的数量
- 否则,返回0.
分支处理
在上面一个步骤中获得了strategy之后,会根据不同的结果进行分支处理。
- CONTINUE,表示需要重试。
- BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
- SELECT,表示需要通过select方法获取就绪的Channel列表,当NioEventLoop中不存在异步任务时,也就是任务队列为空,则返回该策略。
1 | java复制代码switch (strategy) { |
SelectStrategy.SELECT
当NioEventLoop线程中不存在异步任务时,则开始执行SELECT策略
1 | java复制代码//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L |
select方法定义如下,默认情况下deadlineNanos=NONE
,所以会调用select()
方法阻塞。
1 | java复制代码private int select(long deadlineNanos) throws IOException { |
最终返回就绪的channel个数,后续的逻辑中会根据返回的就绪channel个数来决定执行逻辑。
NioEventLoop.run中的业务处理
业务处理的逻辑相对来说比较容易理解
- 如果有就绪的channel,则处理就绪channel的IO事件
- 处理完成后同步执行异步队列中的任务。
- 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环发生了空转(既没有IO需要处理、也没有执行任何任务),那么记录下来(selectCnt); ,如果连续发生空转(selectCnt达到一定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup处理);
Java Nio中有一个bug,Java nio在Linux系统下的epoll空轮询问题。也就是在
select()
方法中,及时就绪的channel为0,也会从本来应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。
1 | java复制代码@Override |
processSelectedKeys
通过在select
方法中,我们可以获得就绪的I/O事件数量,从而触发执行processSelectedKeys
方法。
1 | java复制代码private void processSelectedKeys() { |
处理I/O事件时,有两个逻辑分支处理:
- 一种是处理Netty优化过的selectedKeys,
- 另一种是正常的处理逻辑
processSelectedKeys方法中根据是否设置了selectedKeys
来判断使用哪种策略,默认使用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet
。
processSelectedKeysOptimized
1 | java复制代码private void processSelectedKeysOptimized() { |
processSelectedKey
1 | java复制代码private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
NioMessageUnsafe.read()
假设此时是一个读操作,或者是客户端建立连接,那么代码执行逻辑如下,
1 | java复制代码@Override |
SelectedSelectionKeySet的优化
Netty中自己封装实现了一个SelectedSelectionKeySet,用来优化原本SelectorKeys的结构,它是怎么进行优化的呢?先来看它的代码定义
1 | java复制代码final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { |
SelectedSelectionKeySet内部使用的是SelectionKey数组,所有在processSelectedKeysOptimized方法中可以直接通过遍历数组来取出就绪的I/O事件。
而原来的Set<SelectionKey>
返回的是HashSet类型,两者相比,SelectionKey[]不需要考虑哈希冲突的问题,所以可以实现O(1)时间复杂度的add操作。
SelectedSelectionKeySet的初始化
netty通过反射的方式,把Selector对象内部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。
原本的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会直接填充到SelectedSelectionKeySet的数组中。后续只需要遍历即可。
1 | java复制代码private SelectorTuple openSelector() { |
异步任务的执行流程
分析完上面的流程后,我们继续来看NioEventLoop中的run方法中,针对异步任务的处理流程
1 | java复制代码@Override |
runAllTask
需要注意,NioEventLoop可以支持定时任务的执行,通过nioEventLoop.schedule()
来完成。
1 | java复制代码protected boolean runAllTasks() { |
fetchFromScheduledTaskQueue
遍历scheduledTaskQueue中的任务,添加到taskQueue中。
1 | java复制代码private boolean fetchFromScheduledTaskQueue() { |
任务添加方法execute
NioEventLoop内部有两个非常重要的异步任务队列,分别是普通任务和定时任务队列,针对这两个队列提供了两个方法分别向两个队列中添加任务。
- execute()
- schedule()
其中,execute方法的定义如下。
1 | java复制代码private void execute(Runnable task, boolean immediate) { |
Nio的空轮转问题
所谓的空轮训,是指我们在执行selector.select()
方法时,如果没有就绪的SocketChannel时,当前线程会被阻塞 。 而空轮询是指当没有就绪SocketChannel时,会被触发唤醒。
而这个唤醒是没有任何读写请求的,从而导致线程在做无效的轮询,使得CPU占用率较高。
导致这个问题的根本原因是:
在部分Linux的2.6的kernel中,poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP,也可能是POLLERR,eventSet事件集合发生了变化,这就可能导致Selector会被唤醒。这是与操作系统机制有关系的,JDK虽然仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最初的版本中(严格意义上来将,JDK部分版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终一直到2013年才最终修复的原因,最终影响力太广。
Netty是如何解决这个问题的呢?我们回到NioEventLoop的run方法中
1 | java复制代码@Override |
unexpectedSelectorWakeup
1 | java复制代码private boolean unexpectedSelectorWakeup(int selectCnt) { |
rebuildSelector()
1 | java复制代码public void rebuildSelector() { |
rebuildSelector0
这个方法的主要作用: 重新创建一个选择器,替代当前事件循环中的选择器
1 | java复制代码private void rebuildSelector0() { |
从上述过程中我们发现,Netty解决NIO空轮转问题的方式,是通过重建Selector对象来完成的,在这个重建过程中,核心是把Selector中所有的SelectionKey重新注册到新的Selector上,从而巧妙的避免了JDK epoll空轮训问题。
连接的建立及处理过程
在9.2.4.3节中,提到了当客户端有连接或者读事件发送到服务端时,会调用NioMessageUnsafe类的read()方法。
1 | java复制代码public void read() { |
pipeline.fireChannelRead(readBuf.get(i))
继续来看pipeline的触发方法,此时的pipeline组成,如果当前是连接事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。
1 | java复制代码static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { |
ServerBootstrapAcceptor
ServerBootstrapAcceptor是NioServerSocketChannel中一个特殊的Handler,专门用来处理客户端连接事件,该方法中核心的目的是把针对SocketChannel的handler链表,添加到当前NioSocketChannel中的pipeline中。
1 | java复制代码public void channelRead(ChannelHandlerContext ctx, Object msg) { |
pipeline的构建过程
9.6.2节中,child其实就是一个NioSocketChannel,它是在NioServerSocketChannel中,当接收到一个新的链接时,创建对象。
1 | java复制代码@Override |
而NioSocketChannel在构造时,调用了父类AbstractChannel中的构造方法,初始化了一个pipeline.
1 | java复制代码protected AbstractChannel(Channel parent) { |
DefaultChannelPipeline
pipeline的默认实例是DefaultChannelPipeline,构造方法如下。
1 | java复制代码protected DefaultChannelPipeline(Channel channel) { |
初始化了一个头节点和尾节点,组成一个双向链表,如图9-2所示
图9-2
NioSocketChannel中handler链的构成
再回到ServerBootstrapAccepter的channelRead方法中,收到客户端连接时,触发了NioSocketChannel中的pipeline的添加
以下代码是DefaultChannelPipeline的addLast方法。
1 | java复制代码@Override |
addLast
把服务端配置的ChannelHandler,添加到pipeline中,注意,此时的pipeline中保存的是ChannelInitializer回调方法。
1 | java复制代码@Override |
这个回调方法什么时候触发调用呢?其实就是在ServerBootstrapAcceptor
这个类的channelRead方法中,注册当前NioSocketChannel时
1 | java复制代码childGroup.register(child).addListener(new ChannelFutureListener() {} |
最终按照之前我们上一节课源码分析的思路,定位到AbstractChannel中的register0方法中。
1 | java复制代码private void register0(ChannelPromise promise) { |
callHandlerAddedForAllHandlers
pipeline.invokeHandlerAddedIfNeeded()方法,向下执行,会进入到DefaultChannelPipeline这个类中的callHandlerAddedForAllHandlers方法中
1 | java复制代码private void callHandlerAddedForAllHandlers() { |
我们发现,pendingHandlerCallbackHead这个单向链表,是在callHandlerCallbackLater方法中被添加的,
而callHandlerCallbackLater又是在addLast方法中添加的,所以构成了一个异步完整的闭环。
ChannelInitializer.handlerAdded
task.execute()方法执行路径是
callHandlerAdded0 -> ctx.callHandlerAdded ->
——-> AbstractChannelHandlerContext.callHandlerAddded()
—————> ChannelInitializer.handlerAdded
调用initChannel方法来初始化NioSocketChannel中的Channel.
1 | java复制代码@Override |
接着,调用initChannel抽象方法,该方法由具体的实现类来完成。
1 | java复制代码private boolean initChannel(ChannelHandlerContext ctx) throws Exception { |
ChannelInitializer的实现,是我们自定义Server中的匿名内部类,ChannelInitializer。因此通过这个回调来完成当前NioSocketChannel的pipeline的构建过程。
1 | java复制代码public static void main(String[] args){ |
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注同名微信公众号获取更多技术干货!
本文转载自: 掘金