查看原文
其他

源码分析Dubbo override实现原理

微信公众号:[中间件兴趣圈]
作者简介:《RocketMQ技术内幕》作者

在上篇在讲解源码分析Dubbo路由注册与发现机制的时候,dubbo管理员可以通过dubbo-admin管理系统在线上修改dubbo服务提供者的参数,最终将存储在注册中心的configurators catalog,然后通知RegistryDirectory更新服务提供者的URL中相关属性,按照最新的配置,重新创建Invoker并销毁原来的Invoker。

override协议简介

有关官方文档关于动态改变配置(override协议)的详细描述如下:

dubbo-admin 管理后台,界面如下:


当Dubbo管理人员在上述界面,选择配置后点击保存,会构建override:// url存入到注册中心(configurators) catalog下,此时基于注册中心发现服务提供者的监听器(RegistryDirectory)会收到回调(notify)方法,接下来我们再来看一下RegistryDirectory#notify方法。

RegistryDirectory#notify

1public synchronized void notify(List<URL> urls) {        // @1
2        List<URL> invokerUrls = new ArrayList<URL>();
3        List<URL> routerUrls = new ArrayList<URL>();
4        List<URL> configuratorUrls = new ArrayList<URL>();
5        for (URL url : urls) {
6            String protocol = url.getProtocol();
7            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
8            if (Constants.ROUTERS_CATEGORY.equals(category) 
9                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
10                routerUrls.add(url);
11            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
12                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
13                configuratorUrls.add(url);
14            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
15                invokerUrls.add(url);
16            } else {
17                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + 
18                     NetUtils.getLocalHost());
19            }
20        }
21        // configurators 
22        if (configuratorUrls != null && configuratorUrls.size() >0 ){       // @2
23            this.configurators = toConfigurators(configuratorUrls);
24        }
25        // routers
26        if (routerUrls != null && routerUrls.size() >0 ){
27            List<Router> routers = toRouters(routerUrls);
28            if(routers != null){ // null - do nothing
29                setRouters(routers);
30            }
31        }
32        List<Configurator> localConfigurators = this.configurators; // local reference
33        // 合并override参数
34        this.overrideDirectoryUrl = directoryUrl;
35        if (localConfigurators != null && localConfigurators.size() > 0) {
36            for (Configurator configurator : localConfigurators) {
37                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
38            }
39        }
40        // providers
41        refreshInvoker(invokerUrls);    // @3
42}

由于这个方法的实现在上一篇文章《源码分析Dubbo服务发现机制(RegistryDirectory)》中详细分析,故这里只列出与本文章相关的关注点:
代码@1:参数为当前configurators目录下所有的URL,例如:

1urls: [override://0.0.0.0/com.wuys.frame.api.service.IUserService?category=configurators&dynamic=false&enabled=true&timeout=10000override://0.0.0.0/com.wuys.frame.api.service.IUserService?category=configurators&dynamic=false&enabled=true&weight=200]。

代码@2:将override url转换为List< Configurator>,是本节重点要讨论的内容。
代码@3:调用refreshInvoker方法,由于这里的invokerUrls为空,此时会对原先的invoker马上应用新的配置参数吗?带着这个疑问,我们先看一下refreshInvoker是如何处理的,然后回头重点分析代码@2的实现细节。
关于refreshInvoker的实现,在上一篇源码分析Dubbo服务注册与发现机制RegistryDirectory)中也详细分析过,这里只是为了求证一下:

1if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
2       invokerUrls.addAll(this.cachedInvokerUrls);
3 } else {
4      this.cachedInvokerUrls = new HashSet<URL>();
5            this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
6      }
7     if (invokerUrls.size() ==0 ){
8          return;
9     }
10   // 省略部分代码
11}

从这里看出,如果invokerUrls为空,如果已缓存的服务提供者不为空,则将已缓存的服务提供者加入到invokerUrls中,此时invokerUrls不为空,则会重新用新的配置生成新的invoker,然后销毁原先的invoker。

接下来重点分析Dubbo关于override协议的解析实现细节。

override核心类图

  1. Configurator:协议配置接口,主要抽象出两个接口方法:

  • URL getUrl():获取配置URL。

  • URL configure(URL url):根据configureUrl来配置 URL url。

  1. AbstractConfigurator:协议配置抽象实现类(模板类)。

  2. AbsentConfigurator:absent配置器,其策略是,如果configureUrl存在的属性,则不覆盖。

  3. OverrideConfigurator:override配置器,其策略是,直接覆盖属性。

源码分析Override实现原理

源码分析AbstractConfigurator

AbstractConfigurator#configure

1public URL configure(URL url) {
2        if (configuratorUrl == null || configuratorUrl.getHost() == null
3                || url == null || url.getHost() == null) {     // @1
4            return url;
5        }
6
7        if (configuratorUrl.getPort() != 0) {          //  @2
8            if (url.getPort() == configuratorUrl.getPort()) {
9                return configureIfMatch(url.getHost(), url);      // @3
10            }
11        } else {
12            if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) {   // @4
13                return configureIfMatch(NetUtils.getLocalHost(), url);// NetUtils.getLocalHost is the ip address consumer registered to registry.
14            } else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) {   // @5
15                return configureIfMatch(Constants.ANYHOST_VALUE, url);
16            }
17        }
18        return url;
19    }

代码@1:如果configuratorUrl (配置URL)为空host为空,或url为空或host为空,则返回url。这里参数的覆盖方向  configuratorUrl ----> url。
代码@2:如果configuratorUrl如果端口不为空,则需要判断url的端口,端口必须相同,才执行configuratorUrl配置url。
代码@3,执行具体的配置操作,下文待分析。
代码@4、@5:如果端口为空,该配置URL(configuratorUrl)的类型要么是针对消费者,要么地址是0.0.0.0(任意)。

如果url属于服务消费者,host为消费者的注册IP地址,如果是服务提供者,则host为0.0.0.0来配置。

AbstractConfigurator#configureIfMatch

1private URL configureIfMatch(String host, URL url) {
2        if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) {
3            String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY,
4                    configuratorUrl.getUsername());
5            String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername());
6            if (configApplication == null || Constants.ANY_VALUE.equals(configApplication)
7                    || configApplication.equals(currentApplication)) {
8                Set<String> condtionKeys = new HashSet<String>();
9                condtionKeys.add(Constants.CATEGORY_KEY);
10                condtionKeys.add(Constants.CHECK_KEY);
11                condtionKeys.add(Constants.DYNAMIC_KEY);
12                condtionKeys.add(Constants.ENABLED_KEY);
13                for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
14                    String key = entry.getKey();
15                    String value = entry.getValue();
16                    if (key.startsWith("~") || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) {
17                        condtionKeys.add(key);
18                        if (value != null && !Constants.ANY_VALUE.equals(value)
19                                && !value.equals(url.getParameter(key.startsWith("~") ? key.substring(1) : key))) {
20                            return url;
21                        }
22                    }
23                }
24                return doConfigure(url, configuratorUrl.removeParameters(condtionKeys));
25            }
26        }
27        return url;
28    }

该方法主要实现的功能就是排除不能动态修改的属性,不支持属性主要包括:category、check、dynamic、enabled、还有以~开头的属性,并且如果~开头的属性,配置URL与原URL的值不相同,则不使用该配置URL重写原URL。将配置URL(configuratorUrl)移除不支持属性后,调用其子类的doConfigure方法覆盖属性,Dubbo默认支持如下覆盖策略

  • override 直接覆盖。

  • absent,如果原先存在该属性的配置,则以原先配置的属性值优先,如果原先没有配置该属性,则添加新的配置属性。

总结一下:当在dubbo-admin(管理后台)中创建一条override规则后,会首先存储在注册中心(zookeeper的指定目录下${service}/configurators目录下,此时基于注册中心的事件机制,会通知相关监听者(服务消费者),服务消费者收到最新的配置时,会根据最新的配置重新构建Invoker对象,然后销毁原先的Invoker对象。


广告:作者新书《RocketMQ技术内幕》已上市

《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!


更多文章请关注微信公众号:

推荐关注微信公众号:RocketMQ官方微信公众号


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存