『Netty核心』数据交互源码解读

「这是我参与11月更文挑战的第30天,活动详情查看:2021最后一次更文挑战」。

点赞再看,养成习惯👏👏

1、NioEventLoopGroup

1
java复制代码EventLoopGroup bossGroup = new NioEventLoopGroup(1);

初始化的时候如果有传线程数量的话以传的为主,如果没有传含有的子线程NioEventLoop的个数默认为cpu核数的两倍。

1
2
3
4
5
6
7
java复制代码private static final int DEFAULT_EVENT_LOOP_THREADS = 
Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里的children是NioEventLoopGroup的一个成员变量,通过for循环一个个赋值,

1
2
java复制代码this.children = new EventExecutor[nThreads];
this.children[i] = this.newChild((Executor)executor, args);

所以跟上面介绍的类似,一个NioEventLoopGroup里面有多个NioEventLoop

1
2
3
4
5
java复制代码NioEventLoopGroup.class
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider)args[0]
((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
}

一个NioEventLoop里面有Selector和TaskQueue

1
2
3
4
5
6
7
8
9
10
java复制代码SingleThreadEventExecutor.class
this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue(maxPendingTasks);
}

NioEventLoop.class
this.provider = selectorProvider;
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();//调用nio方法
this.selector = selectorTuple.selector;

2、ServerBootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码//创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
//初始化服务器连接队列大小,服务器处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//对workGroup的SocketChannel设置处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});

上面本质就是通过链式编程的形式给ServerBootstrap的各个成员变量赋初值,以便在后续使用。

3、NioServerSocketChannel

NioServerSocketChannel初始化主要返回ServerSocketChannel,这步也是对nio的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码NioServerSocketChannel.class
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//ServerSocketChannel serverSocket = ServerSocketChannel.open();
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

同时也执行this.pipeline = this.newChannelPipeline();为该成员变量pipeline赋初始值,通过pipeline将各个handler串连起来,这里只是初始化tail和head,后续调用的时候会使用到。

1
2
3
4
5
6
7
8
9
java复制代码protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}

同时也将该channel设置为非阻塞和接收连接,也是对nio的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//readInterestOp传进来是16也就是SelectionKey.OP_ACCEPT,这里只是普通的赋值,还没进行绑定
this.readInterestOp = readInterestOp;

try {
//serverSocket.configureBlocking(false);
ch.configureBlocking(false);
} catch (IOException var7) {
try {
ch.close();
} catch (IOException var6) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", var6);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", var7);
}
}

4、源码剖析图:

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%