在之前一篇文章《Dubbo消费者调用过程源码分析》 讲到,在创建代理的时候会生成调用对象invoker,这个时候就会绑定集群策略,我们来看生成invoker的代码,在类ReferenceConfig#createProxy(Map<String, String> map)
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 if (url != null && url.length() > 0 ) { String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0 ) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0 ) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { List<URL> us = loadRegistries(false ); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null ) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config." ); } } if (urls.size() == 1 ) { invoker = refprotocol.refer(interfaceClass, urls.get(0 )); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null ; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null ) { URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } }
我们总结出三种生成集群策略的入口,当urls的长度为1时,此时该url可能为注册中心的地址也可能是服务的直连地址,则进一步执行invoker = refprotocol.refer(interfaceClass, urls.get(0));
;当urls的长度不为1时,此时可能为多个注册中心的地址或者多个服务直连地址,当为多个注册中心的地址时,会执行:
1 2 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));
当为多个直连地址时,则执行:invoker = cluster.join(new StaticDirectory(invokers));
,下面将对这三种方式进行详细的分析。
Dubbo中提供了七种集群模式(FailoverCluster、FailfastCluster、FailsafeCluster、FailbackCluster、ForkingCluster、BroadcastCluster、AvailableCluster),来看一下它们的继承关系图:
下面将分别对每一种集群模式进行分析。
FailoverCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 @Override @SuppressWarnings ({"unchecked" , "rawtypes" })public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1 ; if (len <= 0 ) { len = 1 ; } RpcException le = null ; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { checkWhetherDestroyed(); copyinvokers = list(invocation); checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }
Failover集群容错机制,总的逻辑是,以方法重复次数为限制,每次调用如果失败, 就利用负责均衡策略获取下一个提供者(invoker),直到调用成功,或者最后方法超限,抛出异常, 其中中间如果有业务异常,则不再重试,直接抛出异常。
FailfastCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0 , "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } }
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
FailsafeCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); } }
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
FailbackCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private void addFailed (Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null ) { synchronized (this ) { if (retryFuture == null ) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run () { try { retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic" , t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); } void retryFailed () { if (failed.size() == 0 ) { return ; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again." , e); } } } @Override protected Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null ); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", " , e); addFailed(invocation, this ); return new RpcResult(); } }
此策略失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
ForkingCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Override @SuppressWarnings ({"unchecked" , "rawtypes" })public Result doInvoke (final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); final List<Invoker<T>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<>(); for (int i = 0 ; i < forks; i++) { Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<>(); for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { @Override public void run () { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0 , "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } } finally { RpcContext.getContext().clearAttachments(); } }
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。
BroadcastCluster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override @SuppressWarnings ({"unchecked" , "rawtypes" })public Result doInvoke (final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null ; Result result = null ; for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null ) { throw exception; } return result; }
这个策略通常用于通知所有提供者更新缓存或日志等本地资源信息。
AvailableCluster Available集群容错机制,主要逻辑是,简单的调用第一个可到达的服务,如果都不可达,则抛出异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public <T> Invoker<T> join (Directory<T> directory) throws RpcException { return new AbstractClusterInvoker<T>(directory) { @Override public Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; }