"main@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.dubbo.remoting.transport.netty4.NettyChannel.send(NettyChannel.java:101) at org.apache.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:265) at org.apache.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:53) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:116) at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:90) at org.apache.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient.request(ReferenceCountExchangeClient.java:83) at org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:108) at org.apache.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:154) at org.apache.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:77) at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:75) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72) at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:47) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72) at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72) at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56) at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:78) at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:243) at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:70) at org.apache.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1) at cn.shuaijunlan.dubbo.learning.main.Main.main(Main.java:16)
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); //设置重试次数 int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } //根据负载均衡策略选择调用者 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); //将使用过的invoker放入invoked invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } thrownew RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }
所以会有如下的线程栈信息:
1 2 3
at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:78) at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:243) at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)
/** * Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows: * 1.If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, and notice that any parameter changes in the URL will be re-referenced. * 2.If the incoming invoker list is not empty, it means that it is the latest invoker list * 3.If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route rule, which needs to be re-contrasted to decide whether to re-reference. * * @param invokerUrls this parameter can't be null */ // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated. privatevoidrefreshInvoker(List<URL> invokerUrls){ if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { //禁止访问 this.forbidden = true; // Forbid to access //置空列表 this.methodInvokerMap = null; // Set the method invoker map to null //关闭所有invokers destroyAllInvokers(); // Close all invokers } else { //允许访问 this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); //缓存invokerUrls列表,便于交叉对比 this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } //生成invoker方法 toInvokers //将url列表转换成invoker列表 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map //换方法名映射invoker列表 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map // state change // If the calculation is wrong, it is not processed. //如果计算错误则不进行处理 if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { //关闭不使用的invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
/** * The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer. * 代理类,主要用于存储注册中心下发的URL地址 * 用于重新refer时能够根据providerURL queryMap overrideMap重新组装 * @param <T> */ privatestaticclassInvokerDelegate<T> extendsInvokerWrapper<T> { private URL providerUrl;
at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:75) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72) at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:47) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72) at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50) at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
@Override public Result invoke(Invocation inv)throws RpcException { if (destroyed.get()) { thrownew RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is DESTROYED, can not be invoked any more!"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
Result result; if (isAsyncFuture) { // register resultCallback, sometimes we need the asyn result being processed by the filter chain. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { thrownew RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { thrownew RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
所以会有这两句线程栈输出:
1 2
at org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:108) at org.apache.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:154)
private ExchangeClient[] getClients(URL url) { // whether to share connection //是否共享连接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // if not configured, connection is shared, otherwise, one connection for one service //如果没有配置connection,那么就创建共享连接,否则一个服务一个连接? if (connections == 0) { service_share_connect = true; connections = 1; }
ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } /** * Get shared connection */ private ExchangeClient getSharedClient(URL url){ String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null) { if (!client.isClosed()) { client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } }
locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); }
// BIO is not allowed since it has severe performance issue. //BIO存在严重的性能问题,因此不能使用 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { thrownew RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); }
ExchangeClient client; try { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { //通过 Exchangers.connect(url, requestHandler); 构建client ,接下来跟踪Exchangers.connect方法 //这里会传入一个requestHandler,这个是客户端解救服务端方法返回回调的 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { thrownew RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
@Override public ExchangeClient connect(URL url, ExchangeHandler handler)throws RemotingException { returnnew HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
调用Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))生成channel实例,这里Transporters也是个门面类,是Facade设计模式的实现,继续分析Transporters的connect方法:
@Override publicvoidsend(Object message, boolean sent)throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } //获取具体的channel实例 Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { thrownew RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
这里的getChannel()方法由NettyClient自身实现,如下:
1 2 3 4 5 6 7
@Override protected org.apache.dubbo.remoting.Channel getChannel(){ Channel c = channel; if (c == null || !c.isActive()) returnnull; return NettyChannel.getOrAddChannel(c, getUrl(), this); }
所以有如下栈信息:
1 2 3
at org.apache.dubbo.remoting.transport.netty4.NettyChannel.send(NettyChannel.java:101) at org.apache.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:265) at org.apache.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:53)