Netty服务端启动流程分析

Netty服务端启动流程分析

前言

Netty 服务端启动和交互的逻辑的底层实现是借助于Java NIO ServerSocketChannel来实现,Java NIO ServerSocketChannel作为服务端的绑定端口、接受客户端的连接,这篇文章将详细介绍Netty服务端的启动流程,从源码的角度一步步分析。

doBind(final SocketAddress localAddress)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private ChannelFuture doBind(final SocketAddress localAddress) {
//1.初始化和注册Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//2.绑定端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点:

  • 1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。

  • 2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。

  • 3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。

  • 4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。

第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。

下面将分成4部分对每行代码具体做了哪些工作进行详细分析。

第1部分,initAndRegister()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//创建一个channel并初始化
channel = channelFactory.newChannel();
//init逻辑,如果是服务端则调用@ServerBootStrap的init方法,如果是客户端则调用Bootstrap的init方法
/**
* init逻辑:
* 如果是服务端则调用{@link ServerBootstrap#init(Channel channel)}方法,
* 如果是客户端则调用{@link Bootstrap#init(Channel channel)}方法
*/
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

在initAndRegister()函数中,通过函数名字也能看出来主要就三个过程:

  • 1.创建channel
1
channel = channelFactory.newChannel();
  • 2.初始化channel

    1
    init(channel);
  • 3.向group注册channel:

1
ChannelFuture regFuture = config().group().register(channel);

下面我们来分别分析下上面的三个过程:

一、创建channel:channelFactory.newChannel();

我们知道b.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory为: ReflectiveChannelFactory类的对象。其中这里ReflectiveChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class

因此,channel = channelFactory.newChannel();就是调用的ReflectiveChannelFactory类中的newChannel()方法,该方法的源码为:

1
2
3
4
5
6
7
8
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

通过这个方法,我们可以得出结论,是通过反射机制来产生一个NioServerSocketChannel类的实例。

下面将看下NioServerSocketChannel类的构造函数做了哪些工作。

调用它的无参构造函数:

1
2
3
4
5
6
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

无参构造函数中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()

函数newSocket的功能为:利用SelectorProvider产生一个SocketChannelImpl对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}

然后继续调用有参构造函数以及父类的构造函数,调用过程如下:

1
2
3
4
5
6
7
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
  • 父类AbstractNioMessageChannel构造函数
1
2
3
4
5
6
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
  • 父类AbstractNioChannel构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
  • 父类AbstractChannel构造函数
1
2
3
4
5
6
7
8
9
10
11
12
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

通过分析NioServerSocketChannel实例化过程,调用那么多的构造函数其实主要干了五件事情:

1、产生来一个SocketChannelImpl类的实例,并设置为非阻塞的

1
ch.configureBlocking(false);

2、设置了config属性

1
config = new NioServerSocketChannelConfig(this, javaChannel().socket());

3、设置SelectionKey.OP_ACCEPT事件

1
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT

4、设置unsafe属性

1
2
3
4
5
unsafe = newUnsafe();
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}

主要作用为:用来负责底层的connect、register、read和write等操作。

5、设置pipeline属性

1
pipeline = new DefaultChannelPipeline(this);

每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。

这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。

结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline

二、初始化channel:init(channel);

因为这篇文章是分析Netty服务端的启动流程,因此会进入ServerBootstrap#init()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

该函数的功能主要功能包括如下几点:

  • 1.设置channel的options
1
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

options可能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);

if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
setAllocator((ByteBufAllocator) value);
} else if (option == RCVBUF_ALLOCATOR) {
setRecvByteBufAllocator((RecvByteBufAllocator) value);
} else if (option == AUTO_READ) {
setAutoRead((Boolean) value);
} else if (option == AUTO_CLOSE) {
setAutoClose((Boolean) value);
} else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
setWriteBufferHighWaterMark((Integer) value);
} else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
setWriteBufferLowWaterMark((Integer) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else {
return false;
}

return true;
}
  • 2.设置channel的attrs

如果没有设置,则attrs为空,该属性在ServerBootstrap类中的定义如下

1
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
  • 3.设置handler到channel的pipeline上
1
2
3
4
5
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
  • 4.在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor

    从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器

1
2
3
4
5
6
7
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});

看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.

看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务.

三、注册channel:config().group().register(channel);

前面的分析我们知道group为:NioEvenLoopGroup,其继承MultithreadEventLoopGroup,该类中的register方法如下:

1
2
3
4
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next()方法的代码如下,其功能为选择下一个NioEventLoop对象,这里的next方法调用了父类MultithreadEventExecutorGroup的next方法

1
2
3
4
@Override
public EventExecutor next() {
return chooser.next();
}

NioEventLoopGroup分析中,我们在MultithreadEventExecutorGroup类的构造函数中看到:根据线程个数nThreads是否为2的幂次方来选择chooser,其中这两个chooser为: PowerOfTwoEventExecutorChooserGenericEventExecutorChooser

小结:由于NioEventLoopGroup中维护着多个NioEventLoop,next方法回调用chooser策略找到下一个NioEventLoop,并执行该对象的register方法进行注册。

由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

。。。。。。

因此,channel.unsafe().register(this, promise)这行代码调用的是AbstractUnsafe类中的register方法,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

register()方法的核心实现思路为:

  • 1、通过调用eventLoop.inEventLoop()方法判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,说明该EventLoop在等待并没有执行权,则进行第2步。

AbstractEventExecutor

1
2
3
4
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}

SingleThreadEventExecutor

1
2
3
4
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
  • 2、既然该EventLoop中的线程此时没有执行权,但是我们可以提交一个任务到该线程中,等该EventLoop的线程有执行权的时候就自然而然的会执行此任务,而该任务负责调用register0方法,这样也就达到了调用register0方法的目的。具体为:任务OneTimeTask子类被提交到NioEventLoop线程中执行,然后调用此任务的run方法,进而调用register0方法,其中promise = new DefaultChannelPromise(channel, this)。

下面看register0()这个方法,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

在上面的代码中,是通过调用doRegister()方法完成NioServerSocketChannel的注册,该方法的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

第2部分,doBind0(regFuture, channel, localAddress, promise)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

会执行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)这行代码,这行代码完成的功能为:实现channel与端口的绑定。

AbstractChannel

1
2
3
4
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

在该方法中直接调用了pipeline的bind方法,通过前面的分析我们知道这里的pipeline是DefaultChannelPipeline的实例。

1
2
3
4
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

继续看tail实例的bind()方法,调用的是TailContext的父类AbstractChannelHandlerContext的bind()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}

上面bind函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。 findContextOutbound方法找到的 AbstractChannelHandlerContext 对象其实就是 head。然后调用invokeBind()方法:

1
2
3
4
5
6
7
8
9
10
11
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

继续调用HeadContext类的bind()方法:

1
2
3
4
5
6
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

通过前面的分析我们知道unsafe是NioMessageUnsafe的示例,bind方法是继承自AbstractUnsafe类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

上面的核心代码就是:doBind(localAddress);需要注意的是,此doBind方法是在NioServerSocketChannel类中的doBind方法,不是其他类中的。

NioServerSocketChannel类中的doBind方法代码如下:

1
2
3
4
5
6
7
8
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

上面方法中javaChannel()方法返回的是NioServerSocketChannel实例初始化时所产生的Java NIO ServerSocketChannel实例(更具体点为ServerSocketChannelImple实例)。 等价于语句serverSocketChannel.socket().bind(localAddress)完成了指定端口的绑定,这样就开始监听此端口。

__如果对Java NIO层面的服务端绑定端口对端口的监听、客户端的连接以及交互不太清晰,可以看博文Java NIO 之 ServerSocketChannel/SocketChannel __,这里介绍了 Java NIO 层面的服务端对端口的监听、客户端与服务端之间的连接以及客户端于服务端之间的交互。

说到这里本来应该就结束了,但是还有如下的一点需要说明:即绑定端口成功后,是这里调用了我们自定义handler的channelActive方法。更多可以结合博文:

这样就真正的完成了端口的绑定。在绑定之前,isActive()方法返回false,绑定之后返回true。

这样就进入了如下的if条件的代码块中:

1
2
3
4
5
6
7
8
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

进而开始执行 pipeline.fireChannelActive();这行代码 ,这行代码的具体调用链如下所示:

1
2
DefaultChannelPipeline#fireChannelActive(); ---> 
AbstractChannelHandlerContext#invokeChannelActive()

总结

整个流程还是挺复杂的,把Netty服务端的启动流程大体梳理了一遍,还有很多小细节没有具体分析,在接下来的学习过程中我还会继续完善这篇文章。

REFERENCES

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×