(三)集群容错

xiaoxiao2021-02-28  75

集群容错

1.失败转移

配置:cluster="failover"

/**  * 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。  * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a>  * @author william.liangf  */ public class FailoverCluster implements Cluster {     public final static String NAME = "failover";     public <T> Invoker<T> join(Directory<T> directory) throws RpcException {         return new FailoverClusterInvoker<T>(directory);     } } public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {         List<Invoker<T>> copyinvokers = invokers;         checkInvokers(copyinvokers, invocation);         int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;         if (len <= 0) {             len = 1;         }         // retry loop.         RpcException le = null; // last exception.         List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.         Set<String> providers = new HashSet<String>(len);         for (int i = 0; i < len; i++) {             //重试时,进行重新选择,避免重试时invoker列表已发生变化.             //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变             if (i > 0) {                 checkWhetherDestroyed();                 copyinvokers = list(invocation);                 //重新检查一下                 checkInvokers(copyinvokers, invocation);             }             //1处,从提供者列表中选择出一个提供者             Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);             //2处,加入已选择的提供者列表             invoked.add(invoker);             RpcContext.getContext().setInvokers((List)invoked);             try {                 Result result = invoker.invoke(invocation);                 if (le != null && logger.isWarnEnabled()) {                     logger.warn("Although retry the method " + invocation.getMethodName()                             + " in the service " + getInterface().getName()                             + " was successful by the provider " + invoker.getUrl().getAddress()                             + ", but there have been failed providers " + providers                             + " (" + providers.size() + "/" + copyinvokers.size()                             + ") from the registry " + directory.getUrl().getAddress()                             + " on the consumer " + NetUtils.getLocalHost()                             + " using the dubbo version " + Version.getVersion() + ". Last error is: "                             + le.getMessage(), le);                 }                 return result;             } catch (RpcException e) {                 if (e.isBiz()) { // biz exception.                     throw e;                 }                 le = e;             } catch (Throwable e) {                 le = new RpcException(e.getMessage(), e);             } finally {                 providers.add(invoker.getUrl().getAddress());             }         }         throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "                 + invocation.getMethodName() + " in the service " + getInterface().getName()                 + ". Tried " + len + " times of the providers " + providers                 + " (" + providers.size() + "/" + copyinvokers.size()                 + ") from the registry " + directory.getUrl().getAddress()                 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "                 + Version.getVersion() + ". Last error is: "                 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);          } }

看源码的“1处”,会从提供者列表中选择一个提供者,并且在“2处”,加入已选择列表,这便于重新选择提供者,避开已选过提供者。

下面看看选择一个提供者的实现方法的源码:

 

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {     if (invokers == null || invokers.size() == 0)         return null;     String methodName = invocation == null ? "" : invocation.getMethodName();          boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;     {         //ignore overloaded method         if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){             stickyInvoker = null;         }         //ignore cucurrent problem         if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){             if (availablecheck && stickyInvoker.isAvailable()){                 return stickyInvoker;             }         }     }     Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);          if (sticky){         stickyInvoker = invoker;     }     return invoker; } private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // 如果只有两个invoker,退化成轮循 if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } //3处,loadbalance负载均衡策略选择出一个提供者 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //4处,如果被选中的提供者,已经被选择过,则会重新选择一个提供者(调用reselect方法)。 //如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试. if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if(rinvoker != null){ invoker = rinvoker; }else{ //看下第一次选的位置,如果不是最后,选+1位置. int index = invokers.indexOf(invoker); try{ //最后在避免碰撞 invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; }catch (Exception e) { logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); } } }catch (Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t); } } return invoker; }

在源码的“3处”,会用loadbalance负载均衡策略选择出一个提供者,在“4处”,如果被选中的提供者,已经被选择过,则会重新选择一个提供者(调用reselect方法)。

 

 

 

从应用本地提供者列表选择一个提供者(如何选择,查看##负载均衡),

如果调用提供者失败,则重试,重试次数通过retries="2"(默认2次)来配置,重试调用提供者也是从提供者列表中选择一个(如何选择,查看##负载均衡)。 如果只有两个提供者,则集群方式变成轮询,即是第1次失败,则第2次重试会调用另一个提供者

重试时指定的提供者是从未调用的提供列表里获取的。

2.快速失败

配置:cluster="failfast"

/** * 快速失败,只发起一次调用,失败立即报错,通常用于非幂等性的写操作。 * <a href="http://en.wikipedia.org/wiki/Fail-fast">Fail-fast</a> * @author william.liangf */ public class FailfastCluster implements Cluster { public final static String NAME = "failfast"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailfastClusterInvoker<T>(directory); } }

下面看看FailfastClusterInvoker的实现源码:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T>{ public FailfastClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); //21处,选择一个提供者,如何选择一个提供者,这是由负载均衡部分来实现的,可以看上面源码的doselect方法。 Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { //22处,这里只调用了一次,成功则返回结果,失败则抛出异常。 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }

看看上面的源码, “21处”,选择一个提供者,如何选择一个提供者,这是由负载均衡部分来实现的,可以看上面源码的doselect方法。 “22处”,这里只调用了一次,成功则返回结果,失败则抛出异常。  

 

3.失败安全

配置:cluster="failsafe"

/** * 失败安全,出现异常时,直接忽略,通常用于写入审计日志等操作。 * <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a> * @author william.liangf */ public class FailsafeCluster implements Cluster { public final static String NAME = "failsafe"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailsafeClusterInvoker<T>(directory); } }

下面看看FailsafeClusterInvoker的实现:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T>{ private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); //31处,如果异常,则返回结果为空的RpcResult对象。 return new RpcResult(); // ignore } } }

看看源码的“31处”,如果调用异常,则返回结果为空的RpcResult对象,这样不至于有异常时影响消费者,这只是当消费者不在意返回结果的情况才使用这种集群容错方式。  

 

4.并行调用

配置:cluster="forking"

/** * 并行调用,只要一个成功即返回,通常用于实时性要求较高的操作,但需要浪费更多服务资源。 * <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a> * @author william.liangf */ public class ForkingCluster implements Cluster { public final static String NAME = "forking"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new ForkingClusterInvoker<T>(directory); } }

下面看看ForkingClusterInvoker的源码实现:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T>{ private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory<T> directory) { super(directory); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); final List<Invoker<T>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); for (int i = 0; i < forks; i++) { //在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现. Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if(!selected.contains(invoker)){//防止重复添加invoker selected.add(invoker); } } } RpcContext.getContext().setInvokers((List)selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); //41处,循环调用列表中的每一个提供者 for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { public void run() { try { Result result = invoker.invoke(invocation); //42处,如果有结果返回则放到LinkedBlockingQueue队列里 ref.offer(result); } catch(Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { //43处,如果调用抛出异常,也相当于有结果返回,也会往队列放一个异常结果。 ref.offer(e); } } } }); } try { //44处,队列阻塞直到有结果,如果是异常,则会抛出异常,有结果则返回。 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } } }

看看上面的源码, 线程池里,会异步地调用每一个提供者, “41处”,循环调用列表中的每一个提供者,不管调用是有结果还是抛出异常,都会放到队列里, “42处”,如果有结果返回则放到LinkedBlockingQueue队列里, “43处”,如果调用抛出异常,也相当于有结果返回,也会往队列放一个异常结果。 “44处”,队列阻塞直到有结果,如果是异常,则会抛出异常,有结果则返回。  

 

5.失败自动恢复

配置: cluster="failback"

/** * 失败自动恢复,后台记录失败请求,定时重发,通常用于消息通知操作。 * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a> * @author william.liangf */ public class FailbackCluster implements Cluster { public final static String NAME = "failback"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailbackClusterInvoker<T>(directory); } }

下面看看FailbackClusterInvoker的源码实现:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); private static final long RETRY_FAILED_PERIOD = 5 * 1000; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); private volatile ScheduledFuture<?> retryFuture; private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); public FailbackClusterInvoker(Directory<T> directory){ super(directory); } private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { //53处,第一次调用addFailed方法时,retryFutrue为null,会启动定时器scheduledExecutorService if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { //54处,定时器5000毫秒执行一次retryFailed方法,retryFailed方法会循环failed Map对象里的失败调用,再次调用。 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { // 收集统计信息 try { retryFailed(); } catch (Throwable t) { // 防御性容错 logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } //55处,将调用放入Map对象failed中。 failed.put(invocation, router); } void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>( failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } } protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); //51处,会根据负载均衡策略得到一个提供者,调用提供者。 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); //52处,如果调用异常,则加入异常Map中,以备定时器定时重新调用提供者。 addFailed(invocation, this); return new RpcResult(); // ignore } } }

从FailbackClusterInvoker的源码中, “51处”,会根据负载均衡策略得到一个提供者,调用提供者。如果调用失败异常了,会执行“52处”,如果调用异常,则加入异常Map中,以备定时器定时重新调用提供者。 调用异常进入方法addFailed,在“53处”,第一次调用addFailed方法时,retryFutrue为null,会启动定时器scheduledExecutorService,接着在源码“54处”,定时器5000毫秒执行一次retryFailed方法,retryFailed方法会循环failed Map对象里的失败调用,再次调用。在“55处”,就会将调用放入Map对象failed中。

自己写了个RPC:

https://github.com/nytta

可以给个star,^-^.

转载请注明原文地址: https://www.6miu.com/read-59088.html

最新回复(0)