熔断器 Hystrix 源码解析 —— 命令合并执行
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-collapser-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. HystrixCollapser
2.1 构造方法
2.2 执行命令方式
2.3 核心方法
3. RequestCollapserFactory
4. RequestCollapser
4.1 构造方法
4.2 RequestBatch
4.3 #submitRequest(arg)
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
5. CollapserTimer
5.1 RealCollapserTimer
5.2 CollapsedTask
666. 彩蛋
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
1. 概述
本文主要分享 Hystrix 命令合并执行。
在 《【翻译】Hystrix文档-实现原理》「请求合并」 中,对 Hystrix 命令合并执行的概念、原理、使用场景、优缺点已经做了非常详细透彻的分享,所以胖友可以先认真阅读学习下。
命令合并执行整体流程如下图 :
FROM 《【翻译】Hystrix文档-实现原理》「请求合并」
第一步,提交单个命令请求到请求队列( RequestQueue )
第二部,定时任务( TimerTask ) 固定周期从请求队列获取多个命令执行,合并执行。
在官方提供的示例中,我们通过 CommandCollapserGetValueForKey 熟悉命令合并执行的使用。
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
2. HystrixCollapser
com.netflix.hystrix.HystrixCollapser
,命令合并器抽象父类。
NOTE :
com.netflix.hystrix.HystrixObservableCollapser
,另一种命令合并器抽象父类,本文暂不解析。
2.1 构造方法
HystrixCollapser 构造方法,代码如下 :
public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>
implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {
private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
private final HystrixRequestCache requestCache;
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
private final HystrixCollapserMetrics metrics;
/* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
if (collapserKey == null || collapserKey.name().trim().equals("")) {
String defaultKeyName = getDefaultNameFromClass(getClass());
collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
}
HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());
if (metrics == null) {
this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
} else {
this.metrics = metrics;
}
final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
/* strategy: HystrixMetricsPublisherCollapser */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);
/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
*/
collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {
@Override
public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);
self.metrics.markShards(shards.size());
return shards;
}
@Override
public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
final HystrixCommand<BatchReturnType> command = self.createCommand(requests);
command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());
self.metrics.markBatch(requests.size());
return command.toObservable();
}
@Override
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(batchReturnType, requests);
}
}).ignoreElements().cast(Void.class);
}
@Override
public HystrixCollapserKey getCollapserKey() {
return self.getCollapserKey();
}
};
}
}
BatchReturnType 泛型,多个命令合并执行返回结果类型。
ResponseType 泛型,单个命令执行返回结果类型。
RequestArgumentType 泛型,单个命令参数类型。
collapserFactory
属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。requestCache
属性,TODO 【2012】【请求上下文】collapserInstanceWrapper
属性,命令合并器包装器。com.netflix.hystrix.collapser.HystrixCollapserBridge
接口,点击 链接 查看代码。HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。
metrics
属性,TODO 【2002】【metrics】
2.2 执行命令方式
在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。
HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :
#observe()
方法 :传送门 。#queue()
方法 :传送门 。#execute()
方法 :传送门 。
下面一起来看看 #toObservable()
方法的实现,代码如下 :
1: public Observable<ResponseType> toObservable() {
2: // when we callback with the data we want to do the work
3: // on a separate thread than the one giving us the callback
4: return toObservable(Schedulers.computation());
5: }
6:
7: public Observable<ResponseType> toObservable(Scheduler observeOn) {
8: return Observable.defer(new Func0<Observable<ResponseType>>() {
9: @Override
10: public Observable<ResponseType> call() {
11: // // 缓存开关、缓存KEY
12: final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
13: final String cacheKey = getCacheKey();
14:
15: // 优先从缓存中获取
16: /* try from cache first */
17: if (isRequestCacheEnabled) {
18: HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey);
19: if (fromCache != null) {
20: metrics.markResponseFromCache();
21: return fromCache.toObservable();
22: }
23: }
24:
25: // 获得 RequestCollapser
26: RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
27:
28: // 提交 命令请求
29: Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
30:
31: // 获得 缓存Observable
32: if (isRequestCacheEnabled && cacheKey != null) {
33: HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
34: HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);
35: if (fromCache == null) {
36: return toCache.toObservable();
37: } else {
38: toCache.unsubscribe(); // 取消订阅
39: return fromCache.toObservable();
40: }
41: }
42:
43: // 获得 非缓存Observable
44: return response;
45: }
46: });
47: }
observeOn
方法参数,实际方法暂未用到,跳过无视。第 11 至 13 行 :缓存存开关、KEY 。
【反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和
AbstractCommand#toObservavle(...)
类似,在《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(...)」 有详细解析。【反向】第 44 行 :获得【非缓存 Observable】。
注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在
#queue()
/#execute()
方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。第 26 行 :调用
RequestCollapserFactory#getRequestCollapser()
,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。
2.3 核心方法
#getRequestArgument(...)
抽象方法,获得单个命令参数。代码如下 :
public abstract RequestArgumentType getRequestArgument();
#createCommand(...)
抽象方法,将多个命令请求合并,创建一个 HystrixCommand 。代码如下 :
protected abstract HystrixCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#mapResponseToRequests(...)
抽象方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。
protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :
protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return Collections.singletonList(requests);
}
在未重写
#shardRequests(...)
的情况下,整体方法流程如下 :在重写
#shardRequests(...)
的情况下,整体方法流程如下 :本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。
3. RequestCollapserFactory
com.netflix.hystrix.collapser.RequestCollapserFactory
,RequestCollapser 工厂。
public class RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> {
private final CollapserTimer timer;
private final HystrixCollapserKey collapserKey;
private final HystrixCollapserProperties properties;
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scope scope;
public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {
/* strategy: ConcurrencyStrategy */
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.timer = timer;
this.scope = scope;
this.collapserKey = collapserKey;
this.properties = properties;
}
timer
属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。collapserKey
属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。HystrixCollapserKey ,点击 链接 查看代码。
HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。
properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。scope
属性,命令请求作用域。目前有两种作用域 :REQUEST
:请求上下文( HystrixRequestContext )。Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
No interaction with other user requests.
1 queue per user request.GLOBAL
:全局。Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
1 queue for entire app.
调用 #getRequestCollapser()
方法,获得 RequestCollapser 。代码如下 :
public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {
if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
return getCollapserForUserRequest(commandCollapser);
} else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
return getCollapserForGlobalScope(commandCollapser);
} else {
logger.warn("Invalid Scope: {} Defaulting to REQUEST scope.", getScope());
return getCollapserForUserRequest(commandCollapser);
}
}
根据
scope
不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。REQUEST
:调用#getCollapserForUserRequest()
方法,TODO 【2012】【请求上下文】。GLOBAL
:调用#getCollapserForGlobalScope()
方法,点击 链接 查看中文注释的代码。
4. RequestCollapser
com.netflix.hystrix.collapser.RequestCollapser
,命令请求合并器。主要用于 :
提交单个命令请求到请求队列( RequestQueue )。
接收来自定时任务提交的多个命令,合并执行。
4.1 构造方法
RequestCollapser 构造方法,代码如下 :
public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
// batch can be null once shutdown
private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();
private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>();
private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();
private final CollapserTimer timer;
private final HystrixCollapserProperties properties;
private final HystrixConcurrencyStrategy concurrencyStrategy;
RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {
this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need
this.concurrencyStrategy = concurrencyStrategy;
this.properties = properties;
this.timer = timer;
batch.set(new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()));
}
}
commandCollapser
属性,命令合并器包装器。batch
属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。timerListenerReference
属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。timerListenerRegistered
属性,timerListenerReference
是否已经注册。timer
属性,命令合并器的定时器。properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略。
4.2 RequestBatch
com.netflix.hystrix.collapser.RequestBatch
,命令请求队列。提供如下功能 :
命令请求的添加
命令请求的移除
命令请求的批量执行。笔者把 RequestBatch 解释成 "命令请求队列",主要方便大家理解。
那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch 构造方法,代码如下 :
public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
private final int maxBatchSize;
private final AtomicBoolean batchStarted = new AtomicBoolean();
private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
private final HystrixCollapserProperties properties;
private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();
public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {
this.properties = properties;
this.commandCollapser = commandCollapser;
this.maxBatchSize = maxBatchSize;
}
}
commandCollapser
属性,命令合并器包装器。maxBatchSize
属性,队列最大长度。batchStarted
属性,执行是否开始。argumentMap
属性,命令请求参数映射( 队列 )。properties
属性,命令合并器属性配置。batchLock
属性,argumentMap
操作的读写锁。
RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。
4.3 #submitRequest(arg)
在 #toObservable()
方法里,调用 #submitRequest(arg)
方法,提交单个命令请求到 RequestBatch 。代码如下 :
1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
2: /*
3: * We only want the timer ticking if there are actually things to do so we register it the first time something is added.
4: */
5: if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {
6: /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
7: timerListenerReference.set(timer.addListener(new CollapsedTask()));
8: }
9:
10: // loop until succeed (compare-and-set spin-loop)
11: while (true) {
12: // 获得 RequestBatch
13: final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
14: if (b == null) {
15: return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
16: }
17:
18: // 添加到 RequestBatch
19: final Observable<ResponseType> response;
20: if (arg != null) {
21: response = b.offer(arg);
22: } else {
23: response = b.offer( (RequestArgumentType) NULL_SENTINEL);
24: }
25:
26: // 添加成功,返回 Observable
27: // it will always get an Observable unless we hit the max batch size
28: if (response != null) {
29: return response;
30: } else {
31: // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch
32: // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
33: createNewBatchAndExecutePreviousIfNeeded(b);
34: }
35: }
36: }
第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功。
第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
#shutdown()
后才会出现为null
的情况。第 19 至 24 行 :调动
RequestBatch#offer(...)
方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对arg==null
做了特殊处理,因为RequestBatch.argumentMap
是 ConcurrentHashMap ,不允许值为null
。另外,RequestBatch#offer(...)
方法的实现代码,在结束了当前方法,详细解析。第 28 至 29 行 :添加成功,返回 Observable 。
第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch#offer(...)
方法,代码如下 :
1: public Observable<ResponseType> offer(RequestArgumentType arg) {
2: // 执行已经开始,添加失败
3: /* short-cut - if the batch is started we reject the offer */
4: if (batchStarted.get()) {
5: return null;
6: }
7:
8: /*
9: * The 'read' just means non-exclusive even though we are writing.
10: */
11: if (batchLock.readLock().tryLock()) {
12: try {
13: // 执行已经开始,添加失败
14: /* double-check now that we have the lock - if the batch is started we reject the offer */
15: if (batchStarted.get()) {
16: return null;
17: }
18:
19: // 超过队列最大长度,添加失败
20: if (argumentMap.size() >= maxBatchSize) {
21: return null;
22: } else {
23: // 创建 CollapsedRequestSubject ,并添加到队列
24: CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);
25: final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);
26: /**
27: * If the argument already exists in the batch, then there are 2 options:
28: * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
29: * be hooked up to that argument
30: * B) If request caching is OFF: return an error to all duplicate argument requests
31: *
32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible
33: * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
34: * of trying to figure out which argument of a set of duplicates should get attached to a response.
35: *
36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
37: */
38: if (existing != null) {
39: boolean requestCachingEnabled = properties.requestCacheEnabled().get();
40: if (requestCachingEnabled) {
41: return existing.toObservable();
42: } else {
43: return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
44: }
45: } else {
46: return collapsedRequest.toObservable();
47: }
48:
49: }
50: } finally {
51: batchLock.readLock().unlock();
52: }
53: } else {
54: return null;
55: }
56: }
第 4 至 6 行 :执行已经开始,添加失败。在
RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法的开头,优先 CAS 使batchStarted=true
。第 11 行 :获得读锁。
The'read'just means non-exclusive even though we are writing.
,即使该方法实际在做"写操作",不排他,线程安全,所以可以使用读锁。第 15 至 17 行 :
double-check
,执行已经开始,添加失败。在RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法,优先 CAS 使batchStarted=true
,再获取写锁,所以会出现该情况。第 20 至 21 行 :超过队列最大长度,添加失败。
第 24 至 25 行 :创建
com.netflix.hystrix.collapser.CollapsedRequestSubject
,并将它添加到队列(argumentMap
) 。argument
属性,单个命令请求参数。valueSet
属性,结果( Response ) 是否设置,通过#setResponse()
/#emitResponse()
方法设置。subject
属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过#setResponse()
/#emitResponse()
方法设置结果。outstandingSubscriptions
属性,订阅数量。subjectWithAccounting
属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用RequestBatch#remove(arg)
方法,移除单个命令请求。CollapsedRequestSubject 实现
com.netflix.hystrix.HystrixCollapser.CollapsedRequest
接口,定义了批量命令执行的请求,不仅限于获得请求参数(#getArgument()
方法 ),也包括对批量命令执行结束后,每个请求的结果设置(#setResponse(...)
/#emitResponse(...)
/#setException(...)
/#setComplete()
方法 ),点击 链接 查看该接口的代码。CollapsedRequestSubject 构造方法,代码如下:
/* package */class CollapsedRequestSubject<T, R> implements CollapsedRequest<T, R> {
/**
* 参数
*/
private final R argument;
/**
* 结果( response ) 是否设置
*/
private AtomicBoolean valueSet = new AtomicBoolean(false);
/**
* 可回放的 ReplaySubject
*/
private final ReplaySubject<T> subject = ReplaySubject.create();
/**
* 带订阅数量的 ReplaySubject
*/
private final Observable<T> subjectWithAccounting;
/**
* 订阅数量
*/
private volatile int outstandingSubscriptions = 0;
public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
// 设置 argument
if (arg == RequestCollapser.NULL_SENTINEL) {
this.argument = null;
} else {
this.argument = arg;
}
// 设置 带订阅数量的 ReplaySubject
this.subjectWithAccounting = subject
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
containingBatch.remove(arg);
}
}
});
}
}
第 38 至 47 行 :返回 Observable 。
当
argumentMap
已经存在arg
对应的 Observable 时,必须开启缓存 (HystrixCollapserProperties.requestCachingEnabled=true
) 功能。原因是,如果在相同的arg
,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable()
,那么相同的arg
将有多个 Observable 执行命令,此时HystrixCollapserBridge#mapResponseToRequests(...)
方法无法将执行( Response )赋值到arg
对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。回过头看
HystrixCollapser#toObservable()
方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢?argumentMap
针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是1:1:N
的关系,通过HystrixCollapser#toObservable()
对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。
RequestBatch#remove()
方法,代码如下 :
/* package-private */ void remove(RequestArgumentType arg) {
if (batchStarted.get()) {
//nothing we can do
return;
}
if (batchLock.readLock().tryLock()) {
try {
/* double-check now that we have the lock - if the batch is started, deleting is useless */
if (batchStarted.get()) {
return;
}
argumentMap.remove(arg);
} finally {
batchLock.readLock().unlock();
}
}
}
当 RequestBatch 开始执行,不允许移除单个命令请求。
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
本小节建议在 「5. CollapserTimer」 后,再回过头看。
#createNewBatchAndExecutePreviousIfNeeded(previousBatch)
方法,代码如下 :
1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) {
2: if (previousBatch == null) {
3: throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");
4: }
5: if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {
6: // this thread won so trigger the previous batch
7: previousBatch.executeBatchIfNotAlreadyStarted();
8: }
9: }
第 5 行 :通过 CAS 修改
batch
,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。第 6 行 :使用老的 RequestBatch ,调用
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,命令合并执行。
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,代码如下 :
1: public void executeBatchIfNotAlreadyStarted() {
2: /*
3: * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit)
4: * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch
5: */
6: // 设置 执行已经开始
7: if (batchStarted.compareAndSet(false, true)) {
8: // 获得 写锁
9: /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */
10: batchLock.writeLock().lock();
11:
12: try {
13: // 将多个命令请求分片成 N 个【多个命令请求】。
14: // shard batches
15: Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
16: // for each shard execute its requests
17: for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
18: try {
19: // 将多个命令请求合并,创建一个 HystrixCommand
20: // create a new command to handle this batch of requests
21: Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
22:
23: // 将一个 HystrixCommand 的执行结果,映射回对应的命令请求们
24: commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
25:
26: /**
27: * This handles failed completions
28: */
29: @Override
30: public void call(Throwable e) {
31: // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted
32: Exception ee;
33: if (e instanceof Exception) {
34: ee = (Exception) e;
35: } else {
36: ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);
37: }
38: logger.debug("Exception mapping responses to requests.", e);
39: // if a failure occurs we want to pass that exception to all of the Futures that we've returned
40: for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
41: try {
42: ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
43: } catch (IllegalStateException e2) {
44: // if we have partial responses set in mapResponseToRequests
45: // then we may get IllegalStateException as we loop over them
46: // so we'll log but continue to the rest
47: logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
48: }
49: }
50: }
51:
52: }).doOnCompleted(new Action0() {
53:
54: /**
55: * This handles successful completions
56: */
57: @Override
58: public void call() {
59: // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
60: Exception e = null;
61: for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {
62: try {
63: e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");
64: } catch (IllegalStateException e2) {
65: logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
66: }
67: }
68: }
69:
70: }).subscribe();
71:
72: } catch (Exception e) {
73: // 异常
74: logger.error("Exception while creating and queueing command with batch.", e);
75: // if a failure occurs we want to pass that exception to all of the Futures that we've returned
76: for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {
77: try {
78: request.setException(e);
79: } catch (IllegalStateException e2) {
80: logger.debug("Failed trying to setException on CollapsedRequest", e2);
81: }
82: }
83: }
84: }
85:
86: } catch (Exception e) {
87: // 异常
88: logger.error("Exception while sharding requests.", e);
89: // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails
90: for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
91: try {
92: request.setException(e);
93: } catch (IllegalStateException e2) {
94: logger.debug("Failed trying to setException on CollapsedRequest", e2);
95: }
96: }
97: } finally {
98: batchLock.writeLock().unlock();
99: }
100: }
101: }
代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
第 7 行 :通过 CAS 修改
batchStarted
,保证并发情况下的线程安全。第 10 行 :获得写锁。等待调用
#offer(...)
/#remove(...)
方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。第 15 行 :调用
HystrixCollapserBridge#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。第 17 行 :循环 N 个【多个命令请求】。
第 21 行 :调用
HystrixCollapserBridge#createObservableCommand(...)
方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。第 24 行 :调用
HystrixCollapserBridge#mapResponseToRequests(...)
方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。Observable#single()
方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。Observable#ignoreElements()
方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知(#onError()
或#onCompleted()
)通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击rx.internal.operators.OperatorIgnoreElements
看下源码,可能更加易懂。Observable#cast()
方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map
的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击rx.internal.operators.OperatorCast
看下源码,可能更加易懂。使用
Observable#ignoreElements()
/Observable#cast()
方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里Observable#doNext()
处理数据项,调用HystrixCollapser#mapResponseToRequests(...)
方法。点击 链接 ,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。第 24 至 50 行 :调用
Observable#doError(Action1)
方法,当命令合并执行发生异常时,设置每个CollapsedRequestSubject 的执行结果为异常。点击 链接,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。第 52 至 68 行 :调用
Observable#doOnCompleted(Action0)
方法,当命令合并执行完成时,检查每个CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现HystrixCollapser#mapResponseToRequests(...)
方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞。第 70 行 :调用
Observable#subscribe()
方法,触发 HystrixCommand 执行。第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。
点击 链接,查看
CollapsedRequestSubject#setException(response)
方法的代码。第 97 至 99 行 :释放写锁。
5. CollapserTimer
com.netflix.hystrix.collapser.CollapserTimer
,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :
public interface CollapserTimer {
Reference<TimerListener> addListener(TimerListener collapseTask);
}
5.1 RealCollapserTimer
com.netflix.hystrix.collapser.RealCollapserTimer
,命令合并器的定时器实现类,代码如下 :
public class RealCollapserTimer implements CollapserTimer {
/* single global timer that all collapsers will schedule their tasks on */
private final static HystrixTimer timer = HystrixTimer.getInstance();
@Override
public Reference<TimerListener> addListener(TimerListener collapseTask) {
return timer.addTimerListener(collapseTask);
}
}
实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer 」 有详细解析。
5.2 CollapsedTask
com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask
,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds=10ms
) 轮询其对应的一个 RequestCollapser 当前RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。
代码比较简单,点击 链接 直接看代码。
666. 彩蛋
T T 一开始把命令合并执行,理解成类似线程池批量执行任务,怎么看官方示例,怎么奇怪。有一样的同学,一起泪目 + 握爪下。
本文有点点长,实在不想拆分成多篇。
恩,另外部分地方写的不够清晰,欢迎一起讨论和优化。
胖友,分享一波朋友圈可好!