熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. HystrixThreadPoolProperties
3. HystrixThreadPoolKey
4. HystrixConcurrencyStrategy
5. HystrixThreadPool
6. HystrixScheduler
666. 彩蛋
1. 概述
本文主要分享 Hystrix 命令执行(二)之执行隔离策略。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :
SEMAPHORE
:信号量,命令在调用线程执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 已经详细解析。THREAD
:线程池,命令在线程池执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「5. #executeCommandWithSpecifiedIsolation(...)」 的#executeCommandWithSpecifiedIsolation(...)
方法中,调用Observable#subscribeOn(Scheduler)
方法,指定在 RxJava Scheduler 执行。如果你暂时不了解 Scheduler ,可以阅读 《RxJava 源码解析 —— Scheduler》 。
如果你暂时不了解
Observable#subscribeOn(Scheduler)
,可以阅读 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 。
两种方式的优缺点比较,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」。
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
2. HystrixThreadPoolProperties
com.netflix.hystrix.HystrixThreadPoolProperties
,Hystrix 线程池属性配置抽象类,点击 链接 查看,已添加中文注释说明。
com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault
,Hystrix 线程池配置实现类,点击 链接 查看。实际上没什么内容,官方如是说 :
Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)
3. HystrixThreadPoolKey
com.netflix.hystrix.HystrixThreadPoolKey
,Hystrix 线程池标识接口。
FROM HystrixThreadPoolKey 接口注释
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.
直白的说 ,希望通过相同的
name
( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个name
与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。
HystrixThreadPoolKey 代码如下 :
1: public interface HystrixThreadPoolKey extends HystrixKey {
2: class Factory {
3: private Factory() {
4: }
5:
6: // used to intern instances so we don't keep re-creating them millions of times for the same key
7: private static final InternMap<String, HystrixThreadPoolKey> intern
8: = new InternMap<String, HystrixThreadPoolKey>(
9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key) {
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
15:
16: public static HystrixThreadPoolKey asKey(String name) {
17: return intern.interned(name);
18: }
19:
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21: public HystrixThreadPoolKeyDefault(String name) {
22: super(name);
23: }
24: }
25:
26: /* package-private */ static int getThreadPoolCount() {
27: return intern.size();
28: }
29: }
30: }
HystrixThreadPoolKey 实现
com.netflix.hystrix.HystrixKey
接口,点击 链接 查看。该接口定义的#name()
方法,即是上文我们所说的标识( Key )。intern
属性,name
与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。com.netflix.hystrix.util.InternMap
,点击 链接 查看带中文注释的代码。#asKey(name)
方法,从intern
获得 HystrixThreadPoolKey 对象。#getThreadPoolCount()
方法,获得 HystrixThreadPoolKey 数量。
在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixThreadPoolKey threadPoolKey;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略无关代码
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
}
调用
#initThreadPoolKey(...)
方法,创建最终的threadPoolKey
属性。代码如下 :优先级 :
threadPoolKeyOverride
>threadPoolKey
>groupKey
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else { // threadPoolKeyOverride 可覆盖属性
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
4. HystrixConcurrencyStrategy
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
,Hystrix 并发策略抽象类。
HystrixConcurrencyStrategy#getThreadPool(...)
方法,代码如下 :
1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
3:
4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
8:
9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10:
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }
第 2 行 :调用
#getThreadFactory(...)
方法,获得 ThreadFactory 。点击 链接 查看方法代码。PlatformSpecific#getAppEngineThreadFactory()
方法,无需细看,适用于 Google App Engine 场景。第 4 至 7 行 :「2. HystrixThreadPoolProperties」 有详细解析。
第 9 行 :调用
#getBlockingQueue()
方法,获得线程池的阻塞队列。点击 链接 查看方法代码。《Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析》
《Java并发包中的同步队列SynchronousQueue实现原理》
当
maxQueueSize<=0
时( 默认值 :-1
) 时,使用 SynchronousQueue 。超过线程池的maximumPoolSize
时,提交任务被拒绝。当
SynchronousQueue>0
时,使用 LinkedBlockingQueue 。超过线程池的maximumPoolSize
时,任务被拒绝。超过线程池的maximumPoolSize
+ 线程池队列的maxQueueSize
时,提交任务被阻塞等待。推荐 :《聊聊并发(三)——JAVA线程池的分析和使用》
推荐 :《聊聊并发(七)——Java中的阻塞队列》
第 11 至 23 行 :创建 ThreadPoolExecutor 。看起来代码比较多,根据
allowMaximumSizeToDivergeFromCoreSize
的情况,计算线程池的maximumPoolSize
属性。计算的方式和HystrixThreadPoolProperties#actualMaximumSize()
方法是一致的。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault
,Hystrix 并发策略实现类。代码如下( 基本没做啥 ) :
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {
/**
* 单例
*/
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance() {
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault() {
}
}
在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey
属性,代码如下 :
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略无关代码
// 初始化 并发策略
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
}
HystrixPlugins ,Hystrix 插件体系,https://github.com/Netflix/Hystrix/wiki/Plugins 有详细解析。
调用
HystrixPlugins#getConcurrencyStrategy()
获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现自定义的 HystrixConcurrencyStrategy 实现,以达到覆写#getThreadPool()
,#getBlockingQueue()
等方法。点击 链接 查看该方法代码。
5. HystrixThreadPool
com.netflix.hystrix.HystrixThreadPool
,Hystrix 线程池接口。当 Hystrix 命令使用 THREAD
执行隔离策略时, HystrixCommand#run()
方法在线程池执行。点击 链接 查看。HystrixThreadPool 定义接口如下 :
#getExecutor()
:获得 ExecutorService 。#getScheduler()
/#getScheduler(Func0<Boolean>)
:获得 RxJava Scheduler 。#isQueueSpaceAvailable()
:线程池队列是否有空余。#markThreadExecution()
/#markThreadCompletion()
/#markThreadRejection()
:TODO 【2002】【metrics】
5.1 HystrixThreadPoolDefault
com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault
,Hystrix 线程池实现类。
构造方法,代码如下 :
1: private final HystrixThreadPoolProperties properties;
2: private final BlockingQueue<Runnable> queue;
3: private final ThreadPoolExecutor threadPool;
4: private final HystrixThreadPoolMetrics metrics;
5: private final int queueSize;
6:
7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
8: // 初始化 HystrixThreadPoolProperties
9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 获得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 队列大小
13: this.queueSize = properties.maxQueueSize().get();
14:
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
19:
20: // 获得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 队列
23:
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
第 9 行 :初始化 HystrixThreadPoolProperties 。
第 11 行 :初始化 HystrixConcurrencyStrategy 。
第 13 行 :初始化
queueSize
。第 16 至 18 行 :TODO 【2002】【metrics】
第 17 行 :调用
HystrixConcurrencyStrategy#getThreadPool(...)
方法,初始化 ThreadPoolExecutor 。第 21 行 :获得 ThreadPoolExecutor 。
第 22 行 :获得 ThreadPoolExecutor 的队列。
第 26 行 :TODO 【2002】【metrics】
#getExecutor()
方法,代码如下 :
@Override
public ThreadPoolExecutor getExecutor() {
touchConfig();
return threadPool;
}
调用
#touchConfig()
方法,动态调整threadPool
的coreSize
/maximumSize
/keepAliveTime
参数。点击 链接 查看该方法。
#getScheduler()
/ #getScheduler(Func0<Boolean>)
方法,代码如下 :
@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
HystrixContextScheduler 和
shouldInterruptThread
都在 「6. HystrixContextScheduler」 详细解析。
#isQueueSpaceAvailable()
方法,代码如下 :
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
由于线程池的队列大小不能动态调整,该方法的实现通过
HystrixThreadPoolProperties.queueSizeRejectionThreshold
属性控制。注意
queueSize
属性,决定了线程池的队列类型。queueSize<=0
时,#isQueueSpaceAvailable()
都返回true
的原因是,线程池使用 SynchronousQueue 作为队列,不支持新任务排队,任务超过线程池的maximumPoolSize
时,新任务被拒绝。queueSize>0
时,#isQueueSpaceAvailable()
根据情况true
/false
的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持一定数量的阻塞排队,但是这个数量无法调整。通过#isQueueSpaceAvailable()
方法的判断,动态调整。另外,初始配置的queueSize
要相对大,否则即使queueSizeRejectionThreshold
配置的大于queueSize
,实际提交任务到线程池,也会被拒绝。
5.2 Factory
com.netflix.hystrix.HystrixThreadPool.Factory
,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。
threadPools
属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
Key 为
HystrixThreadPoolKey#name()
,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。
#getInstance(...)
方法,获得 HystrixThreadPool 对象,代码如下 :
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
根据
threadPoolKey
先从threadPool
获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到threadPool
。
#shutdown()
/ #shutdown(timeout, unit)
方法,比较易懂,点击 链接 查看。
5.3 初始化
在 AbstractCommand 构造方法里,初始化命令的 threadPool
属性,代码如下 :
protected final HystrixThreadPool threadPool;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略其他代码
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 初始化 threadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}
调用
#initThreadPool(...)
方法,获得 HystrixThreadPool ,点击 链接 查看。
6. HystrixScheduler
Hystrix 实现了自定义的 RxJava Scheduler ,整体类图如下 :
HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler抽象类 )的
actualScheduler
属性。HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的
worker
属性。
6.1 HystrixContextScheduler
构造方法,代码如下 :
public class HystrixContextScheduler extends Scheduler {
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
// ... 省略无关代码
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
}
actualScheduler
属性,类型为 ThreadPoolScheduler 。
#createWorker()
方法,代码如下 :
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
使用
actualScheduler
创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。
6.2 HystrixContextSchedulerWorker
构造方法,代码如下 :
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
// ... 省略无关代码
private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;
}
}
worker
属性,类型为 ThreadPoolWorker 。
#schedule(Action0)
方法,代码如下 :
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
调用
ThreadPool#isQueueSpaceAvailable()
方法,判断线程池队列是否有空余。这个就是 HystrixContextScheduler 的实际用途。
#unsubscribe()
/ #isUnsubscribed()
方法,使用 worker
判断,点击 链接查看。
6.3 ThreadPoolScheduler
ThreadPoolScheduler 比较简单,点击 链接 查看。
6.4 ThreadPoolWorker
构造方法,代码如下 :
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
// ... 省略无关代码
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
}
subscription
属性,订阅信息。
#schedule(Action0)
方法,代码如下 :
1: @Override
2: public Subscription schedule(final Action0 action) {
3: // 未订阅,返回
4: if (subscription.isUnsubscribed()) {
5: // don't schedule, we are unsubscribed
6: return Subscriptions.unsubscribed();
7: }
8:
9: // 创建 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
12:
13: // 添加到 订阅
14: subscription.add(sa);
15: sa.addParent(subscription);
16:
17: // 提交 任务
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21:
22: return sa;
23: }
第 4 至 7 行 :未订阅,返回。
第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。
第 14 至 15 行 :添加到订阅(
subscription
)。第 18 至 20 行 :使用
threadPool
,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅(sa
)。第 22 行 :返回订阅(
sa
)。整体订阅关系如下 :
#unsubscribe()
/ #isUnsubscribed()
方法,使用 subscription
判断,点击 链接查看。
6.5 FutureCompleterWithConfigurableInterrupt
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt
,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter
,在它的基础上,支持配置 FutureTask#cancel(Boolean)
是否可打断运行( mayInterruptIfRunning
)。
构造方法,代码如下 :
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
// ... 省略无关代码
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
}
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe()
方法,取消执行。
#unsubscribe()
方法,代码如下 :
@Override
public void unsubscribe() {
// 从 线程池 移除 任务
executor.remove(f);
// 根据 shouldInterruptThread 配置,是否强制取消
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
根据
shouldInterruptThread
方法,判断是否强制取消。shouldInterruptThread
对应的方法,实现代码如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
当
executionIsolationThreadInterruptOnTimeout=true
时,命令可执行超时。当命令可执行超时时,强制取消。当使用
HystrixCommand.queue()
返回的 Future ,可以使用Future#cancel(Boolean)
取消命令执行。从shouldInterruptThread
对应的方法可以看到,如果此时不满足命令执行超时的条件,命令执行取消的方式是非强制的。此时当executionIsolationThreadInterruptOnFutureCancel=true
时,并且调用Future#cancel(Boolean)
传递mayInterruptIfRunning=true
,强制取消命令执行。模拟测试用例 :
CommandHelloWorld#testAsynchronous3()
HystrixCommand#queue()
:点击 链接 查看Future#cancel(Boolean)
方法。
666. 彩蛋
一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。
继续 Go On ~ 周末嗨不停。
胖友,分享一波朋友圈可好!