Netty服务端接收连接过程分析

Netty服务端接收连接过程分析

前面《Netty服务端启动流程分析》这篇文章主要介绍了Netty服务端的启动过程,那么这里有个问题:启动服务端之后,有新的连接向服务端发起请求,服务端是如何接收并处理这些连接请求的?带着这个问题我将结合netty源码分析Netty服务端接收连接的过程。

这篇文章我将主要讲解一下几个部分:

  • 1、netty如何接收新的请求
  • 2、netty如何给新的请求分配reactor线程
  • 3、netty如何给每个新连接天剑ChannelHandler

检测到有新连接进入

我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生;

NioEventLoop.java

1
2
3
4
5
6
7
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}

上面这段代码是reactor线程三部曲中的第二部曲,表示boos reactor线程已经轮询到 SelectionKey.OP_ACCEPT 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作

关于 unsafe,这篇文章我不打算细讲,下面是netty作者对于unsafe的解释

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport.

你只需要了解一个大概的概念,就是所有的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操作都由unsafe来实现

而从上一篇文章,服务端的启动过程中,我们已经知道,服务端对应的channel的unsafe是 NioMessageUnsafe,那么,我们进入到它的read方法,进入新连接处理的第二步

注册到worker reactor线程

NioMessageUnsafe.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
}

我省去了非关键部分的代码,可以看到,一上来,就用一条断言确定该read方法必须是reactor线程调用,然后拿到channel对应的pipeline和 RecvByteBufAllocator.Handle(先不解释)

接下来,调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead(),将每条新连接经过一层服务端channel的洗礼

之后清理容器,触发 pipeline.fireChannelReadComplete(),整个过程清晰明了,不含一丝杂质,下面我们具体看下这两个方法:

  • 1.doReadMessages(List)

  • 2.pipeline.fireChannelRead(NioSocketChannel)

doReadMessages(List)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

这段代码应该很熟悉,通过调用jdk底层nio的方法SocketUtils.accept(javaChannel());,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel。

netty将jdk的 SocketChannel 封装成自定义的 NioSocketChannel,加入到list里面,这样外层就可以遍历该list,做后续处理;

关于NioSocketChannel的创建过程可以参考《Netty客户端连接过程源码分析》,在创建出一条NioSocketChannel之后,放置在List容器里,就开始进行下一步操作

pipeline.fireChannelRead(NioSocketChannel)

关于pipline的介绍可以看另一篇文章《》,通过《Netty服务端启动流程分析》这篇文章我们知道,在服务端处理新连接的pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;

ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}

前面的 pipeline.fireChannelRead(NioSocketChannel); 最终通过head->unsafe->ServerBootstrapAcceptor的调用链,调用到这里的 ServerBootstrapAcceptorchannelRead方法

注册读事件

References

https://blog.csdn.net/u013160932/article/details/80474486

netty源码分析之新连接接入全解析

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×