源码分析Dubbo服务提供者、服务消费者并发度控制机制
微信公众号:[中间件兴趣圈]
作者简介:《RocketMQ技术内幕》
本文将详细分析<dubbo:service executes=""/>与<dubbo:reference actives = ""/>的实现机制,深入探讨Dubbo自身的保护机制。
源码分析ExecuteLimitFilter
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )
过滤器作用
服务调用方并发度控制。使用场景
对Dubbo服务提供者实现的一种保护机制,控制每个服务的最大并发度。阻断条件
当服务调用超过允许的并发度后,直接抛出RpcException异常。
接下来源码分析ExecuteLimitFilter的实现细节。
ExecuteLimitFilter#invoke
1public Result invoke(Invoker > invoker, Invocation invocation) throws RpcException {
2 URL url = invoker.getUrl();
3 String methodName = invocation.getMethodName();
4 Semaphore executesLimit = null;
5 boolean acquireResult = false;
6 int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); // @1
7 if (max > 0) {
8 RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // @2
9 executesLimit = count.getSemaphore(max); // @3
10 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { // @4
11 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads
12 greater than <dubbo:service executes=\"" + max + "\" /> limited.");
13 }
14 }
15 boolean isSuccess = true;
16 try {
17 Result result = invoker.invoke(invocation); // @5
18 return result;
19 } catch (Throwable t) {
20 isSuccess = false;
21 if (t instanceof RuntimeException) {
22 throw (RuntimeException) t;
23 } else {
24 throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
25 }
26 } finally {
27 if(acquireResult) { // @6
28 executesLimit.release();
29 }
30 }
31 }
代码@1:从服务提供者列表中获取参数executes的值,如果该值小于等于0,表示不启用并发度控制,直接沿着调用链进行调用。
代码@2:根据服务提供者url和服务调用方法名,获取RpcStatus。
1public static RpcStatus getStatus(URL url, String methodName) {
2 String uri = url.toIdentityString();
3 ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
4 if (map == null) {
5 METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
6 map = METHOD_STATISTICS.get(uri);
7 }
8 RpcStatus status = map.get(methodName); /
9 if (status == null) {
10 map.putIfAbsent(methodName, new RpcStatus());
11 status = map.get(methodName);
12 }
13 return status;
14 }
这里是并发容器ConcurrentHashMap的经典使用,从这里可以看出ConcurrentMap<String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存储结构为 { 服务提供者URL唯一字符串:{方法名:RpcStatus} }。
代码@3:根据服务提供者配置的最大并发度,创建该服务该方法对应的信号量对象。
1public Semaphore getSemaphore(int maxThreadNum) {
2 if(maxThreadNum <= 0) {
3 return null;
4 }
5 if (executesLimit == null || executesPermits != maxThreadNum) {
6 synchronized (this) {
7 if (executesLimit == null || executesPermits != maxThreadNum) {
8 executesLimit = new Semaphore(maxThreadNum);
9 executesPermits = maxThreadNum;
10 }
11 }
12 }
13 return executesLimit;
14 }
使用了双重检测来创建executesLimit 信号量。
代码@4:如果获取不到锁,并不会阻塞等待,而是直接抛出RpcException,服务端的策略是快速抛出异常,供服务调用方(消费者)根据集群策略进行执行,例如重试其他服务提供者。
代码@5:执行真实的服务调用。
代码@6:如果成功申请到信号量,在服务调用结束后,释放信号量。
总结:<dubbo:service executes=""/>的含义是,针对每个服务每个方法的最大并发度。如果超过该值,则直接抛出RpcException。
源码分析ActiveLimitFilter
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )
过滤器作用
消费端调用服务的并发控制。使用场景
控制同一个消费端对服务端某一服务的并发调用度,通常该值应该小于< dubbo:service executes=""/>阻断条件
非阻断,但如果超过允许的并发度会阻塞,超过超时时间后将不再调用服务,而是直接抛出超时。
ActiveLimitFilter#invoke
1public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
2 URL url = invoker.getUrl();
3 String methodName = invocation.getMethodName();
4 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // @1
5 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // @2
6 if (max > 0) {
7 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); // @3
8 long start = System.currentTimeMillis();
9 long remain = timeout;
10 int active = count.getActive(); // @4
11 if (active >= max) { // @5
12 synchronized (count) {
13 while ((active = count.getActive()) >= max) {
14 try {
15 count.wait(remain);
16 } catch (InterruptedException e) {
17 }
18 long elapsed = System.currentTimeMillis() - start;
19 remain = timeout - elapsed;
20 if (remain <= 0) { // @6
21 throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
22 + invoker.getInterface().getName() + ", method: "
23 + invocation.getMethodName() + ", elapsed: " + elapsed
24 + ", timeout: " + timeout + ". concurrent invokes: " + active
25 + ". max concurrent invoke limit: " + max);
26 }
27 }
28 }
29 }
30 }
31 try {
32 long begin = System.currentTimeMillis();
33 RpcStatus.beginCount(url, methodName); // @7
34 try {
35 Result result = invoker.invoke(invocation); // @8
36 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); // @9
37 return result;
38 } catch (RuntimeException t) {
39 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
40 throw t;
41 }
42 } finally {
43 if (max > 0) {
44 synchronized (count) {
45 count.notify(); // @10
46 }
47 }
48 }
49 }
代码@1:从Invoker中获取消息端URL中的配置的actives参数,为什么从Invoker中获取的Url是消费端的Url呢?这是因为在消费端根据服务提供者URL创建调用Invoker时,会用服务提供者URL,然后合并消费端的配置属性,其优先级 -D > 消费端 > 服务端。其代码位于:
RegistryDirectory#toInvokers
1URL url = mergeUrl(providerUrl);
代码@2:根据服务提供者URL和调用服务提供者方法,获取RpcStatus。
代码@3:获取接口调用的超时时间,默认为1s。
代码@4:获取当前消费者,针对特定服务,特定方法的并发调用度,active值。
代码@5:如果当前的并发 调用大于等于允许的最大值,则针对该RpcStatus申请锁,并调用其wait(timeout)进行等待,也就是在接口调用超时时间内,还是未被唤醒,则直接抛出超时异常。
代码@6:判断被唤醒的原因是因为等待超时,还是由于调用结束,释放了"名额“,如果是超时唤醒,则直接抛出异常。
代码@7:在一次服务调用前,先将 服务名+方法名对应的RpcStatus的active加一。
代码@8:执行RPC服务调用。
代码@9:记录成功调用或失败调用,并将active减一。
代码@10:最终成功执行,如果开启了actives机制(dubbo:referecnce actives="")时,唤醒等待者。
总结:<dubbo:reference actives=""/> 是控制消费端对单个服务提供者单个服务允许调用的最大并发度。该值的取值不应该大于<dubbo:service executes=""/>的值,并且如果消费者机器的配置,如果性能不尽相同,不建议对该值进行设置。
广告:作者的新书《RocketMQ技术内幕》已上市
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!
更多文章请关注微信公众号:
推荐关注微信公众号:RocketMQ官方微信公众号