private ChannelFuture doBind(final SocketAddress localAddress){ //1.初始化和注册Channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //2.绑定端口 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override publicvoidoperationComplete(ChannelFuture future)throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
final ChannelFuture initAndRegister(){ Channel channel = null; try { //创建一个channel并初始化 channel = channelFactory.newChannel(); //init逻辑,如果是服务端则调用@ServerBootStrap的init方法,如果是客户端则调用Bootstrap的init方法 /** * init逻辑: * 如果是服务端则调用{@link ServerBootstrap#init(Channel channel)}方法, * 如果是客户端则调用{@link Bootstrap#init(Channel channel)}方法 */ init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor returnnew DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor returnnew DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); }
ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
privatestatic ServerSocketChannel newSocket(SelectorProvider provider){ try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { thrownew ChannelException( "Failed to open a server socket.", e); } }
public ServerSocketChannel openServerSocketChannel()throws IOException { returnnew ServerSocketChannelImpl(this); }
然后继续调用有参构造函数以及父类的构造函数,调用过程如下:
1 2 3 4 5 6 7
/** * Create a new instance using the given {@link ServerSocketChannel}. */ publicNioServerSocketChannel(ServerSocketChannel channel){ super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
/** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ protectedAbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp){ super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } }
thrownew ChannelException("Failed to enter non-blocking mode.", e); } }
父类AbstractChannel构造函数
1 2 3 4 5 6 7 8 9 10 11 12
/** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protectedAbstractChannel(Channel parent){ this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
2、既然该EventLoop中的线程此时没有执行权,但是我们可以提交一个任务到该线程中,等该EventLoop的线程有执行权的时候就自然而然的会执行此任务,而该任务负责调用register0方法,这样也就达到了调用register0方法的目的。具体为:任务OneTimeTask子类被提交到NioEventLoop线程中执行,然后调用此任务的run方法,进而调用register0方法,其中promise = new DefaultChannelPromise(channel, this)。
privatevoidregister0(ChannelPromise promise){ try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } elseif (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
@Override protectedvoiddoRegister()throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
privatestaticvoiddoBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise){
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override publicvoidrun(){ if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
@Override publicfinalvoidbind(final SocketAddress localAddress, final ChannelPromise promise){ assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }
// See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); }