Netty客户端连接过程源码分析

Netty客户端连接过程源码分析

这篇文章主要分析Netty客户端连接服务端的过程,并结合Netty框架的源码,在Netty源码中我们看一下连接服务端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();

上面的代码中这句ChannelFuture f = b.connect(HOST, PORT).sync();才是发起连接,这句之前的代码是设置连接的一些属性,这里不详细讲解这个属性,这篇文章的核心是关于连接过程。

我们追踪代码,首先进入Bootstrap类的connect方法,

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
//验证handler、group、channelFactory是否为空
validate();
//执行连接
return doResolveAndConnect(remoteAddress, config.localAddress());
}

验证的逻辑比较简单,核心部分是doResolveAndConnect(remoteAddress, config.localAddress());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//第一步、执行初始化channel,并注册channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//第二步、执行连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//添加一个listener,当执行完成回调该listener
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
//第二步、执行连接
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}

如果看了《Netty服务端启动流程分析》这篇文章,就会对这这段代码非常熟悉,它们的逻辑一模一样,总结来说就是两个过程:

  • 1、初始化并注册channel:final ChannelFuture regFuture = initAndRegister();
  • 2、进一步执行连接:doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());

第一个过程在这篇文章就不详细讲了,可以参考《Netty服务端启动流程分析》这篇文章,下面来详细分析第二过程。

doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}

final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();

if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}

// Wait until the name resolution is finished.
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}

最终会执行doConnect方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}

在该方法中,会在channel所关联到的eventLoop 线程中调用channel.connect方法,注意这里的channel是NioSocketChannel实例。

1
2
3
4
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}

这里的pipeline是DefaultChannelPipeline实例,继续看此pipeline的connect方法:

1
2
3
4
5
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}

在博文Netty源码分析:ChannelPipeline中有关于tail的详细介绍,这里回顾下:tail是TailContext的实例,继承于AbstractChannelHandlerContext,TailContext并没有实现connect方法,因此这里调用的是其父类AbstractChannelHandlerContext的connect方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}

此函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。

看过上篇博文之后,我相信我们都知道这个outbound=ture的节点时哪一个?是head节点,为什么呢?在博文 Netty源码分析:ChannelPipeline中我们知道在DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾. head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口, 即head实例的 outbound = true. 因此在调用上面 findContextOutbound()方法时, 找到的符合outbound=true的节点其实就是 head。

继续看,在pipelie的双向链表中找到第一个outbound=true的AbstractChannelHandlerContext节点head后,然后调用此节点的invokeConnect方法,该方法的代码如下,

1
2
3
4
5
6
7
8
9
10
11
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}

继续看,看HeadContext类中的connect方法,代码如下:

1
2
3
4
5
6
7
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}

继续看NioByteUnsafe类中的 connect方法(准确的说此方法是在AbstractNioUnsafe类中)

1
2
3
4
5
6
7
8
9
10
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//...
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
//...
}
}

上面只保留了connect的关键代码,相关检查和连接失败的代码省略了,上面这个函数主要是调用了doConnect这个方法,需要注意的是,此方法并不是 AbstractNioUnsafe 的方法, 而是 AbstractNioChannel 的抽象方法. doConnect 方法是在 NioSocketChannel 中实现的。

NioSocketChannel类中的doConnect方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}

boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}

上面方法中javaChannel()方法返回的是NioSocketChannel实例初始化时所产生的Java NIO SocketChannel实例(更具体点为SocketChannelImpl实例)。 然后调用此实例的connect方法完成Java NIO层面上的Socket连接。

REFERENCES

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×