之前一篇文章《Dubbo服务提供者发布及注册过程源码分析》 已经介绍了Dubbo服务端的服务注册及发布过程,这篇文章将会介绍Dubbo服务端是如何接受请求以及响应请求的。
本文还是以Consumer-Provider的Demo 为例,分析接收请求及响应请求的具体流程,在Dubbo服务端发布服务之后,它将会监听一个端口等待接收客户端的请求,当接收到请求后,会经过入站处理器进行处理,我们知道在发布服务的时候设置了NettyServerHandler
入站处理器,接收到请求之后,会经过NettyServerHandler#channelRead()
方法来获取请求的消息,我们来看一下它的实现:
1 2 3 4 5 6 7 8 9 10 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
下面先来看一张整个接收请求和处理请求的流程图,下面将会围绕这张图进行详细的分析:
)
来看一下HeaderExchangeHandler#received()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public void received (Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0 ) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
下面我们来分析request-response的模式,也是就分析handleRequest()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 void handleRequest (final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null ) msg = null ; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return ; } Object msg = req.getData(); try { CompletableFuture<Object> future = handler.reply(channel, msg); if (future.isDone()) { res.setStatus(Response.OK); res.setResult(future.get()); channel.send(res); return ; } future.whenComplete((result, t) -> { try { if (t == null ) { res.setStatus(Response.OK); res.setResult(result); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
继续看DubboProtocol类中匿名内部类ExchangeHandlerAdapter#reply()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Override public CompletableFuture<Object> reply (ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods" ); boolean hasMethod = false ; if (methodsStr == null || !methodsStr.contains("," )) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split("," ); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true ; break ; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null ; } } RpcContext rpcContext = RpcContext.getContext(); boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false ); if (supportServerAsync) { CompletableFuture<Object> future = new CompletableFuture<>(); rpcContext.setAsyncContext(new AsyncContextImpl(future)); } rpcContext.setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); if (result instanceof AsyncRpcResult) { return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); } else { return CompletableFuture.completedFuture(result); } } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }
最后获取结果执行channel.send()
方法,向服务消费方返回数据
调用栈:
1 2 3 4 5 6 7 8 9 10 11 12 "DubboServerHandler-211.69.197.55:20881-thread-2@3215" daemon prio=5 tid=0x1a nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.dubbo.remoting.transport.netty4.NettyChannel.send(NettyChannel.java:101) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:89) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.send(HeaderExchangeChannel.java:78) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:103) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:196) at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:51) at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:57) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)