/** * @see #connect() */ private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress){ //第一步、执行初始化channel,并注册channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel();
if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } //第二步、执行连接 return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //添加一个listener,当执行完成回调该listener regFuture.addListener(new ChannelFutureListener() { @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); //第二步、执行连接 doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise){ try { final EventLoop eventLoop = channel.eventLoop(); final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. doConnect(remoteAddress, localAddress, promise); return promise; }
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { // Succeeded to resolve immediately; cached? (or did a blocking lookup) doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; }
// Wait until the name resolution is finished. resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override publicvoidoperationComplete(Future<SocketAddress> future)throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
最终会执行doConnect方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatestaticvoiddoConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise){
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override publicvoidrun(){ if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
@Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise){
if (remoteAddress == null) { thrownew NullPointerException("remoteAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; }
final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override publicvoidrun(){ next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
此函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。