源码分析Dubbo override实现原理
微信公众号:[中间件兴趣圈]
作者简介:《RocketMQ技术内幕》作者
在上篇在讲解源码分析Dubbo路由注册与发现机制的时候,dubbo管理员可以通过dubbo-admin管理系统在线上修改dubbo服务提供者的参数,最终将存储在注册中心的configurators catalog,然后通知RegistryDirectory更新服务提供者的URL中相关属性,按照最新的配置,重新创建Invoker并销毁原来的Invoker。
override协议简介
有关官方文档关于动态改变配置(override协议)的详细描述如下:
当Dubbo管理人员在上述界面,选择配置后点击保存,会构建override:// url存入到注册中心(configurators) catalog下,此时基于注册中心发现服务提供者的监听器(RegistryDirectory)会收到回调(notify)方法,接下来我们再来看一下RegistryDirectory#notify方法。
RegistryDirectory#notify
1public synchronized void notify(List<URL> urls) { // @1
2 List<URL> invokerUrls = new ArrayList<URL>();
3 List<URL> routerUrls = new ArrayList<URL>();
4 List<URL> configuratorUrls = new ArrayList<URL>();
5 for (URL url : urls) {
6 String protocol = url.getProtocol();
7 String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
8 if (Constants.ROUTERS_CATEGORY.equals(category)
9 || Constants.ROUTE_PROTOCOL.equals(protocol)) {
10 routerUrls.add(url);
11 } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
12 || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
13 configuratorUrls.add(url);
14 } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
15 invokerUrls.add(url);
16 } else {
17 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " +
18 NetUtils.getLocalHost());
19 }
20 }
21 // configurators
22 if (configuratorUrls != null && configuratorUrls.size() >0 ){ // @2
23 this.configurators = toConfigurators(configuratorUrls);
24 }
25 // routers
26 if (routerUrls != null && routerUrls.size() >0 ){
27 List<Router> routers = toRouters(routerUrls);
28 if(routers != null){ // null - do nothing
29 setRouters(routers);
30 }
31 }
32 List<Configurator> localConfigurators = this.configurators; // local reference
33 // 合并override参数
34 this.overrideDirectoryUrl = directoryUrl;
35 if (localConfigurators != null && localConfigurators.size() > 0) {
36 for (Configurator configurator : localConfigurators) {
37 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
38 }
39 }
40 // providers
41 refreshInvoker(invokerUrls); // @3
42}
由于这个方法的实现在上一篇文章《源码分析Dubbo服务发现机制(RegistryDirectory)》中详细分析,故这里只列出与本文章相关的关注点:
代码@1:参数为当前configurators目录下所有的URL,例如:
1urls: [override://0.0.0.0/com.wuys.frame.api.service.IUserService?category=configurators&dynamic=false&enabled=true&timeout=10000, override://0.0.0.0/com.wuys.frame.api.service.IUserService?category=configurators&dynamic=false&enabled=true&weight=200]。
代码@2:将override url转换为List< Configurator>,是本节重点要讨论的内容。
代码@3:调用refreshInvoker方法,由于这里的invokerUrls为空,此时会对原先的invoker马上应用新的配置参数吗?带着这个疑问,我们先看一下refreshInvoker是如何处理的,然后回头重点分析代码@2的实现细节。
关于refreshInvoker的实现,在上一篇源码分析Dubbo服务注册与发现机制RegistryDirectory)中也详细分析过,这里只是为了求证一下:
1if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
2 invokerUrls.addAll(this.cachedInvokerUrls);
3 } else {
4 this.cachedInvokerUrls = new HashSet<URL>();
5 this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
6 }
7 if (invokerUrls.size() ==0 ){
8 return;
9 }
10 // 省略部分代码
11}
从这里看出,如果invokerUrls为空,如果已缓存的服务提供者不为空,则将已缓存的服务提供者加入到invokerUrls中,此时invokerUrls不为空,则会重新用新的配置生成新的invoker,然后销毁原先的invoker。
接下来重点分析Dubbo关于override协议的解析实现细节。
override核心类图
Configurator:协议配置接口,主要抽象出两个接口方法:
URL getUrl():获取配置URL。
URL configure(URL url):根据configureUrl来配置 URL url。
AbstractConfigurator:协议配置抽象实现类(模板类)。
AbsentConfigurator:absent配置器,其策略是,如果configureUrl存在的属性,则不覆盖。
OverrideConfigurator:override配置器,其策略是,直接覆盖属性。
源码分析Override实现原理
源码分析AbstractConfigurator
AbstractConfigurator#configure
1public URL configure(URL url) {
2 if (configuratorUrl == null || configuratorUrl.getHost() == null
3 || url == null || url.getHost() == null) { // @1
4 return url;
5 }
6
7 if (configuratorUrl.getPort() != 0) { // @2
8 if (url.getPort() == configuratorUrl.getPort()) {
9 return configureIfMatch(url.getHost(), url); // @3
10 }
11 } else {
12 if (url.getParameter(Constants.SIDE_KEY, Constants.PROVIDER).equals(Constants.CONSUMER)) { // @4
13 return configureIfMatch(NetUtils.getLocalHost(), url);// NetUtils.getLocalHost is the ip address consumer registered to registry.
14 } else if (url.getParameter(Constants.SIDE_KEY, Constants.CONSUMER).equals(Constants.PROVIDER)) { // @5
15 return configureIfMatch(Constants.ANYHOST_VALUE, url);
16 }
17 }
18 return url;
19 }
代码@1:如果configuratorUrl (配置URL)为空host为空,或url为空或host为空,则返回url。这里参数的覆盖方向 configuratorUrl ----> url。
代码@2:如果configuratorUrl如果端口不为空,则需要判断url的端口,端口必须相同,才执行configuratorUrl配置url。
代码@3,执行具体的配置操作,下文待分析。
代码@4、@5:如果端口为空,该配置URL(configuratorUrl)的类型要么是针对消费者,要么地址是0.0.0.0(任意)。
如果url属于服务消费者,host为消费者的注册IP地址,如果是服务提供者,则host为0.0.0.0来配置。
AbstractConfigurator#configureIfMatch
1private URL configureIfMatch(String host, URL url) {
2 if (Constants.ANYHOST_VALUE.equals(configuratorUrl.getHost()) || host.equals(configuratorUrl.getHost())) {
3 String configApplication = configuratorUrl.getParameter(Constants.APPLICATION_KEY,
4 configuratorUrl.getUsername());
5 String currentApplication = url.getParameter(Constants.APPLICATION_KEY, url.getUsername());
6 if (configApplication == null || Constants.ANY_VALUE.equals(configApplication)
7 || configApplication.equals(currentApplication)) {
8 Set<String> condtionKeys = new HashSet<String>();
9 condtionKeys.add(Constants.CATEGORY_KEY);
10 condtionKeys.add(Constants.CHECK_KEY);
11 condtionKeys.add(Constants.DYNAMIC_KEY);
12 condtionKeys.add(Constants.ENABLED_KEY);
13 for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
14 String key = entry.getKey();
15 String value = entry.getValue();
16 if (key.startsWith("~") || Constants.APPLICATION_KEY.equals(key) || Constants.SIDE_KEY.equals(key)) {
17 condtionKeys.add(key);
18 if (value != null && !Constants.ANY_VALUE.equals(value)
19 && !value.equals(url.getParameter(key.startsWith("~") ? key.substring(1) : key))) {
20 return url;
21 }
22 }
23 }
24 return doConfigure(url, configuratorUrl.removeParameters(condtionKeys));
25 }
26 }
27 return url;
28 }
该方法主要实现的功能就是排除不能动态修改的属性,不支持属性主要包括:category、check、dynamic、enabled、还有以~开头的属性,并且如果~开头的属性,配置URL与原URL的值不相同,则不使用该配置URL重写原URL。将配置URL(configuratorUrl)移除不支持属性后,调用其子类的doConfigure方法覆盖属性,Dubbo默认支持如下覆盖策略
override 直接覆盖。
absent,如果原先存在该属性的配置,则以原先配置的属性值优先,如果原先没有配置该属性,则添加新的配置属性。
总结一下:当在dubbo-admin(管理后台)中创建一条override规则后,会首先存储在注册中心(zookeeper的指定目录下${service}/configurators目录下,此时基于注册中心的事件机制,会通知相关监听者(服务消费者),服务消费者收到最新的配置时,会根据最新的配置重新构建Invoker对象,然后销毁原先的Invoker对象。
广告:作者新书《RocketMQ技术内幕》已上市
《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
新书7折优惠!7折优惠!7折优惠!
更多文章请关注微信公众号:
推荐关注微信公众号:RocketMQ官方微信公众号