查看原文
其他

Dubbo的负载均衡详解

点击蓝字

关注我们


前言

负载均衡是指在集群中,将多个数据请求分散在不同单元上进行执行,主要为了提高系统容错能力和加强系统对数据的处理能力。Dubbo如何实现负载均衡的呢?


1.dubbo中的负载均衡

在 Dubbo 中,一次服务的调用就是对所有实体域 Invoker 的一次筛选过滤,最终选定具体调用的 Invoker。首先在 Directory 中获取全部 Invoker 列表,通过路由筛选出符合规则的 Invoker,最后再经过负载均衡选出具体的 Invoker。所以 Dubbo 负载均衡机制是决定一次服务调用使用哪个提供者的服务。



dubbo运行的过程中,在两个位置会使用负载均衡:

  1. 有多个注册中心时,客户端使用负载均衡选择其中一个注册中心上注册的服务;

  2. 客户端使用负载均衡选择一个注册中心上注册的多个服务。

这两个位置使用相同的配置选择负载均衡算法,通过

@Reference(loadbalance = "XXXX")

指定,所以这两个位置使用的负载均衡算法是相同的。


2.对多注册中心负载均衡

当存在多个注册中心时,客户端启动的时候,会创建ZoneAwareClusterInvoker对象(该对象不能使用SPI替换)。ZoneAwareClusterInvoker对象包含客户端注册到注册中心的url和MockClusterInvoker对象集合,每个注册中心对应一个MockClusterInvoker对象。


当客户端调用远程服务时,客户端调用ZoneAwareClusterInvoker的doInvoke方法,doInvoke方法使用配置的负载均衡类选择MockClusterInvoker对象集合中的一个对象。


如果通过负载均衡找到的MockClusterInvoker对象是不可用的,dubbo会遍历MockClusterInvoker对象集合,选择集合中第一个可用的对象。


3.对多服务负载均衡

客户端使用MockClusterInvoker对象调用远程服务。默认情况下,MockClusterInvoker对象将调用远程服务的任务委托给FailoverClusterInvoker。FailoverClusterInvoker通过select方法选择合适的服务。select方法的入参包括负载均衡对象,InvokerWrapper对象集合等,其中每个服务对应一个InvokerWrapper对象,InvokerWrapper对象是对远程调用服务的包装类。


4.dubbo中的负载均衡算法详解

LoadBalance(负载均衡)的职责是将网络请求或者其他形式的负载“均摊”到不同的服务节点上,从而避免服务集群中部分节点压力过大、资源紧张,而另一部分节点比较空闲的情况。


通过合理的负载均衡算法,可以让每个服务节点获取到适合自己处理能力的负载,实现处理能力和流量的合理分配。常用的负载均衡可分为软件负载均衡(比如,日常工作中使用的 Nginx)和硬件负载均衡(主要有 F5、Array、NetScaler 等,不过开发工程师在实践中很少直接接触到)。


常见的 RPC 框架中都有负载均衡的概念和相应的实现,Dubbo 也不例外。Dubbo 需要对 Consumer 的调用请求进行分配,避免少数 Provider 节点负载过大,而剩余的其他 Provider 节点处于空闲的状态。因为当 Provider 负载过大时,就会导致一部分请求超时、丢失等一系列问题发生,造成线上故障。


整体结构

Dubbo 负载均衡的分析入口是

org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance 抽象类,查看这个类继承关系。


  • RandomLoadBalance

  • LeastActiveLoadBalance

  • RoundRobinLoadBalance

  • ShortestResponseLoadBalance

  • ConsistentHashLoadBalance 

被这五个类继承。这5个类是 Dubbo 中提供的五种负载均衡算法的实现。


LoadBalance 是一个扩展接口,默认使用的扩展实现是 RandomLoadBalance,其定义如下所示,其中的 @Adaptive 注解参数为 loadbalance,即动态生成的适配器会按照 URL 中的 loadbalance 参数值选择扩展实现类。

@SPI("random")public interface LoadBalance {
@Adaptive({"loadbalance"}) <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}


LoadBalance 接口中 select() 方法的核心功能是根据传入的 URL 和 Invocation,以及自身的负载均衡算法,从 Invoker 集合中选择一个 Invoker 返回。


AbstractLoadBalance 抽象类并没有真正实现 select() 方法,只是对 Invoker 集合为空或是只包含一个 Invoker 对象的特殊情况进行了处理,具体实现如下:

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (CollectionUtils.isEmpty(invokers)) { // Invoker集合为空,直接返回null return null; } else { // Invoker集合只包含一个Invoker,则直接返回该Invoker对象 // Invoker集合包含多个Invoker对象时,交给doSelect()方法处理,这是个抽象方法,留给子类具体实现 return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation); } }


另外,AbstractLoadBalance 还提供了一个 getWeight() 方法,该方法用于计算 Provider 权重,具体实现如下:

int getWeight(Invoker<?> invoker, Invocation invocation) {
URL url = invoker.getUrl(); int weight; if ("org.apache.dubbo.registry.RegistryService".equals(url.getServiceInterface())) { // 如果是RegistryService接口的话,直接获取权重即可 weight = url.getParameter("registry.weight", 100); } else { weight = url.getMethodParameter(invocation.getMethodName(), "weight", 100); if (weight > 0) { // 获取服务提供者的启动时间戳 long timestamp = invoker.getUrl().getParameter("timestamp", 0L); if (timestamp > 0L) { // 计算Provider运行时长 long uptime = System.currentTimeMillis() - timestamp; if (uptime < 0L) { return 1; } // 计算Provider预热时长 int warmup = invoker.getUrl().getParameter("warmup", 600000); // 如果Provider运行时间小于预热时间,则该Provider节点可能还在预热阶段, // 需要重新计算服务权重(降低其权重) if (uptime > 0L && uptime < (long)warmup) { weight = calculateWarmupWeight((int)uptime, warmup, weight); } } } }

return Math.max(weight, 0); }

calculateWarmupWeight() 方法的目的是对还在预热状态的 Provider 节点进行降权,避免 Provider 一启动就有大量请求涌进来。服务预热是一个优化手段,这是由 JVM 本身的一些特性决定的,例如,JIT 等方面的优化,我们一般会在服务启动之后,让其在小流量状态下运行一段时间,然后再逐步放大流量。

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,随着服务运行时间uptime增大,权重ww的值会慢慢接近配置值weight int ww = (int)((float)uptime / ((float)warmup / (float)weight)); return ww < 1 ? 1 : Math.min(ww, weight);}

了解了 LoadBalance 接口的定义以及 AbstractLoadBalance 提供的公共能力之后,下面我们开始逐个介绍 LoadBalance 接口的具体实现。


5.RandomLoadBalance(随机算法)

加权随机算法负载均衡策略(RandomLoadBalance)是 dubbo 负载均衡的默认实现方式,根据权重分配各个 Invoker 随机选中的比例。这里的意思是:将到达负载均衡流程的 Invoker 列表中的 权重进行求和,然后求出单个 Invoker 权重在总权重中的占比,随机数就在总权重值的范围内生成。


如图,假如当前有192.168.1.10和192.168.1.11两个负载均衡的服务,权重分别为 4、6 ,则它们的被选中的比例为 2/5、3/5。

当生成随机数为 6 时,就会选中192.168.1.11的服务。


dubbo 中 RandomLoadBalance的 doSelect 实现代码:


public class RandomLoadBalance extends AbstractLoadBalance {

public static final String NAME = "random";

@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Invoker 数量 int length = invokers.size(); // 标识所有 Invoker 的权重是否都一样 boolean sameWeight = true; // 用一个数组保存每个 Invoker 的权重 int[] weights = new int[length]; // 第一个 Invoker 的权重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 求和总权重 int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // 保存每个 Invoker 的权重到数组总 weights[i] = weight; // 累加求和总权重 totalWeight += weight; // 如果不是所有 Invoker 的权重都一样,就给标记上 sameWeight = false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 计算随机数取到的 Invoker,条件是必须总权重大于0,并且每个 Invoker 的权重都不一样 if (totalWeight > 0 && !sameWeight) { // 基于 0~总数 范围内生成随机数 int offset = ThreadLocalRandom.current().nextInt(totalWeight); // 计算随机数对应的 Invoker for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 如果所有 Invoker 的权重都一样则随机从 Invoker 列表中返回一个 return invokers.get(ThreadLocalRandom.current().nextInt(length)); }

}


以上就是加权随机策略的实现,这里比较主要关注计算生成的随机数对应的 Invoker。通过遍历权重数组,生成的数累减当前权重值,当 offset 为 0 时,就表示 offset 对应当前的 Invoker 服务。


以生成的随机数为 6 为例,遍历 Invokers 长度:


第一轮:offset = 6 - 4 = 2 不满足 offset < 0,继续遍历。

第二轮:offset = 2 - 6 = -4 满足 offset < 0,返回当前索引对应的 Invoker。因为 offset 返回负数,表示 offset 落在当前 Invoker 权重的区间里。

加权随机策略并非一定按照比例被选到,理论上调用次数越多,分布的比例越接近权重所占的比例。


6.RoundRobinLoadBalance(轮询算法)

加权轮询负载均衡策略(RoundRobinLoadBalance)是基于权重来决定轮询的比例。普通轮询会将请求均匀的分布在每个节点,但不能很好调节不同性能服务器的请求处理,所以加权负载均衡来根据权重在轮询机制中分配相对应的请求比例给每台服务器。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

public static final String NAME = "roundrobin";
private static final int RECYCLE_PERIOD = 60000;
protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } }

private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); private AtomicBoolean updateLock = new AtomicBoolean();
/** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; }
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // key 为 接口名+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // 查看缓存中是否存在相应服务接口的信息,如果没有则新添加一个元素到缓存中 ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } // 总权重 int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; // 当前时间戳 long now = System.currentTimeMillis(); // 最大 current 的 Invoker Invoker<T> selectedInvoker = null; // 保存选中的 WeightedRoundRobin 对象 WeightedRoundRobin selectedWRR = null; // 遍历 Invokers 列表 for (Invoker<T> invoker : invokers) { // 从缓存中获取 WeightedRoundRobin 对象 String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // 获取当前 Invoker 对象 int weight = getWeight(invoker, invocation);

// 如果当前 Invoker 没有对应的 WeightedRoundRobin 对象,则新增一个 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } // 如果当前 Invoker 权重不等于对应的 WeightedRoundRobin 对象中的权重,则重新设置当前权重到对应的 WeightedRoundRobin 对象中 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 累加权重到 current 中 long cur = weightedRoundRobin.increaseCurrent(); // 设置 weightedRoundRobin 对象最后更新时间 weightedRoundRobin.setLastUpdate(now); // 最大 current 的 Invoker,并赋值给相应的变量 if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } // 累加权重到总权重中 totalWeight += weight; } // 如果 Invokers 列表中的数量不等于缓存map中的数量 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // 拷贝 map 到 newMap 中 ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>(); newMap.putAll(map); // newMap 转化为 Iterator Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator(); // 循环删除超过设定时长没更新的缓存 while (it.hasNext()) { Entry<String, WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } // 将当前newMap服务缓存中 methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } // 如果存在被选中的 Invoker if (selectedInvoker != null) { // 计算 current = current - totalWeight selectedWRR.sel(totalWeight); return selectedInvoker; } // 正常情况这里不会到达 return invokers.get(0); }

}

上面选中 Invoker 逻辑为:每个 Invoker 都有一个 current 值,初始值为自身权重。在每个 Invoker 中current = current + weight。遍历完 Invoker 后,current 最大的那个 Invoker 就是本次选中的 Invoker。选中 Invoker 后,将本次 current 值计算current = current - totalWeight。


以上面192.168.1.10和192.168.1.11两个负载均衡的服务,权重分别为 4、6 。基于选中前current = current + weight、选中后current = current - totalWeight计算公式得出如下


7.LeastActiveLoadBalance(最少活跃数算法)

最小活跃数负载均衡策略(LeastActiveLoadBalance)是从最小活跃数的 Invoker 中进行选择。什么是活跃数呢?活跃数是一个 Invoker 正在处理的请求的数量,当 Invoker 开始处理请求时,会将活跃数加 1,完成请求处理后,将相应 Invoker 的活跃数减 1。找出最小活跃数后,最后根据权重进行选择最终的 Invoker。如果最后找出的最小活跃数相同,则随机从中选中一个 Invoker。


public class LeastActiveLoadBalance extends AbstractLoadBalance {

public static final String NAME = "leastactive";

@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Invoker 数量 int length = invokers.size(); // 所有 Invoker 中的最小活跃值都是 -1 int leastActive = -1; // 最小活跃值 Invoker 的数量 int leastCount = 0; // 最小活跃值 Invoker 在 Invokers 列表中对应的下标位置 int[] leastIndexes = new int[length]; // 保存每个 Invoker 的权重 int[] weights = new int[length]; // 总权重 int totalWeight = 0; // 第一个最小活跃数的权重 int firstWeight = 0; // 最小活跃数 Invoker 列表的权重是否一样 boolean sameWeight = true;

// 找出最小活跃数 Invoker 的下标 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // 获取最小活跃数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 获取权重 int afterWarmup = getWeight(invoker, invocation); // 保存权重 weights[i] = afterWarmup; // 如果当前最小活跃数为-1(-1为最小值)或小于leastActive if (leastActive == -1 || active < leastActive) { // 重置最小活跃数 leastActive = active; // 重置最小活跃数 Invoker 的数量 leastCount = 1; // 保存当前 Invoker 在 Invokers 列表中的索引至leastIndexes数组中 leastIndexes[0] = i; // 重置最小活跃数 invoker 的总权重值 totalWeight = afterWarmup; // 记录当前 Invoker 权重为第一个最小活跃数 Invoker 的权重 firstWeight = afterWarmup; // 因为当前 Invoker 重置为第一个最小活跃数 Invoker ,所以标识所有最小活跃数 Invoker 权重都一样的值为 true sameWeight = true; // 如果当前最小活跃数和已声明的最小活跃数相等 } else if (active == leastActive) { // 记录当前 Invoker 的位置 leastIndexes[leastCount++] = i; // 累加当前 Invoker 权重到总权重中 totalWeight += afterWarmup; // 如果当前权重与firstWeight不相等,则将 sameWeight 改为 false if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 如果最小活跃数 Invoker 只有一个,直接返回该 Invoker if (leastCount == 1) { return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // 根据权重随机从最小活跃数 Invoker 列表中选择一个 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 如果所有 Invoker 的权重都一样则随机从 Invoker 列表中返回一个 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); }}


这段代码的整个逻辑就是,从 Invokers 列表中筛选出最小活跃数的 Invoker,然后类似加权随机算法策略方式选择最终的 Invoker 服务。


8.ConsistentHashLoadBalance(一致性Hash算法)

一致性 Hash 负载均衡策略(ConsistentHashLoadBalance)是让参数相同的请求分配到同一机器上。把每个服务节点分布在一个环上,请求也分布在环形中。以请求在环上的位置,顺时针寻找环上第一个服务节点。如图所示:


同时,为避免请求散列不均匀,dubbo 中会将每个 Invoker 再虚拟多个节点出来,使得请求调用更加均匀。


一致性 Hash 实现如下:


public class ConsistentHashLoadBalance extends AbstractLoadBalance {

public static final String NAME = "consistenthash";

/** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes";

/** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments";

private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

@SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 获取请求的方法名 String methodName = RpcUtils.getMethodName(invocation); // key = 接口名+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // invokers 的 hashcode int identityHashCode = System.identityHashCode(invokers); // 查看缓存中是否存在对应 key 的数据,或 Invokers 列表是否有过变动。如果没有,则新添加到缓存中,并且返回负载均衡得出的 Invoker ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }

// ConsistentHashSelector class ...}


doSelect 中主要实现缓存检查和 Invokers 变动检查,一致性 hash 负载均衡的实现在这个内部类 ConsistentHashSelector 中实现。


private static final class ConsistentHashSelector<T> {

// 存储虚拟节点 private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 节点数 private final int replicaNumber;
// invoker 列表的 hashcode,用来判断 Invoker 列表是否变化 private final int identityHashCode;
// 请求中用来作Hash映射的参数的索引 private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取节点数 this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); // 获取配置中的 参数索引 String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); }
for (Invoker<T> invoker : invokers) { // 获取 Invoker 中的地址,包括端口号 String address = invoker.getUrl().getAddress(); // 创建虚拟节点 for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }
// 找出 Invoker public Invoker<T> select(Invocation invocation) { // 将参数转为字符串 String key = toKey(invocation.getArguments()); // 字符串参数转换为 md5 byte[] digest = md5(key); // 根据 md5 找出 Invoker return selectForKey(hash(digest, 0)); }
// 将参数拼接成字符串 private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }
// 利用 md5 匹配到对应的 Invoker private Invoker<T> selectForKey(long hash) { // 找到第一个大于当前 hash 的 Invoker Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); }
// hash 运算 private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; }
// md5 运算 private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); }

}


一致 hash 实现过程就是先创建好虚拟节点,虚拟节点保存在 TreeMap 中。TreeMap 的 key 为配置的参数先进行 md5 运算,然后将 md5 值进行 hash 运算。TreeMap 的 value 为被选中的 Invoker。


最后请求时,计算参数的 hash 值,去从 TreeMap 中获取 Invoker。


9.ShortestResponseLoadBalance(最短响应时间算法)

ShortestResponseLoadBalance 是Dubbo 2.7 版本之后新增加的一个 LoadBalance 实现类。它实现了最短响应时间的负载均衡算法,也就是从多个 Provider 节点中选出调用成功的且响应时间最短的 Provider 节点,不过满足该条件的 Provider 节点可能有多个,所以还要再使用随机算法进行一次选择,得到最终要调用的 Provider 节点。


了解了 ShortestResponseLoadBalance 的核心原理之后,我们一起来看 ShortestResponseLoadBalance.doSelect() 方法的核心实现,如下所示:


public class ShortestResponseLoadBalance extends AbstractLoadBalance {

public static final String NAME = "shortestresponse";

@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // 记录Invoker集合的数量 int length = invokers.size(); // 用于记录所有Invoker集合中最短响应时间 long shortestResponse = Long.MAX_VALUE; // 具有相同最短响应时间的Invoker个数 int shortestCount = 0; // 存放所有最短响应时间的Invoker的下标 int[] shortestIndexes = new int[length]; // 存储每个Invoker的权重 int[] weights = new int[length]; // 存储权重总和 int totalWeight = 0; // 记录第一个Invoker对象的权重 int firstWeight = 0; // 最短响应时间Invoker集合中的Invoker权重是否相同 boolean sameWeight = true;

// Filter out all the shortest response invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 获取调用成功的平均时间,具体计算方式是:调用成功的请求数总数对应的总耗时 / 调用成功的请求数总数 = 成功调用的平均时间 // RpcStatus 的内容在前面课时已经介绍过了,这里不再重复 long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed(); // 获取的是该Provider当前的活跃请求数,也就是当前正在处理的请求数 int active = rpcStatus.getActive(); // 计算一个处理新请求的预估值,也就是如果当前请求发给这个Provider,大概耗时多久处理完成 long estimateResponse = succeededAverageElapsed * active; // 计算该Invoker的权重(主要是处理预热) int afterWarmup = getWeight(invoker, invocation); weights[i] = afterWarmup; // Same as LeastActiveLoadBalance if (estimateResponse < shortestResponse) { // 第一次找到Invoker集合中最短响应耗时的Invoker对象,记录其相关信息 shortestResponse = estimateResponse; shortestCount = 1; shortestIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } else if (estimateResponse == shortestResponse) { // 出现多个耗时最短的Invoker对象 shortestIndexes[shortestCount++] = i; totalWeight += afterWarmup; if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } if (shortestCount == 1) { return invokers.get(shortestIndexes[0]); } // 如果耗时最短的所有Invoker对象的权重不相同,则通过加权随机负载均衡的方式选择一个Invoker返回 if (!sameWeight && totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < shortestCount; i++) { int shortestIndex = shortestIndexes[i]; offsetWeight -= weights[shortestIndex]; if (offsetWeight < 0) { return invokers.get(shortestIndex); } } } // 如果耗时最短的所有Invoker对象的权重相同,则随机返回一个 return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); }}



总结

本文学习了dubbo中常见的几种负载均衡算法的实现,可以多多学习其编码思维。在研究其代码时,需要仔细研究其实现原理,否则比较难懂其思想。





来源:https://blog.csdn.net/qq_31960623/

作者:wh柒八九

 THE END


推荐阅读  

面试官:Redis和Mysql如何保证数据一致性

CompletableFuture的正确打开方式

SpringBoot2.x 整合ShardingSphere5.x实现分表

Spring注入Bean的方式你知道几种?

点赞+在看,关注公众号回复“666”领取福利

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存