熔断器 Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-fourth-fallback/ 「芋道源码」欢迎转载,保留摘要,谢谢!
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. handleFallback
3. #handleShortCircuitViaFallback()
4. #handleSemaphoreRejectionViaFallback()
5. #handleThreadPoolRejectionViaFallback()
6. #handleTimeoutViaFallback()
7. #handleFailureViaFallback()
8. #getFallbackOrThrowException(...)
666. 彩蛋
1. 概述
本文主要分享 Hystrix 命令执行(四)之失败回退逻辑。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 执行命令整体流程如下图:
FROM 《【翻译】Hystrix文档-实现原理》「流程图」
红圈 :Hystrix 命令执行失败,执行回退逻辑。也就是大家经常在文章中看到的“服务降级”。
绿圈 :四种情况会触发失败回退逻辑( fallback )。
第一种 :
short-circuit
,处理链路处于熔断的回退逻辑,在 「3. #handleShortCircuitViaFallback()」 详细解析。第二种 :
semaphore-rejection
,处理信号量获得失败的回退逻辑,在 「4. #handleShortCircuitViaFallback()」 详细解析。第三种 :
thread-pool-rejection
,处理线程池提交任务拒绝的回退逻辑,在 「5. #handleThreadPoolRejectionViaFallback()」 详细解析。第四种 :
execution-timeout
,处理命令执行超时的回退逻辑,在 「6. #handleTimeoutViaFallback()」 详细解析。第五种 :
execution-failure
,处理命令执行异常的回退逻辑,在 「7. #handleFailureViaFallback()」 详细解析。第六种 :
bad-request
,TODO 【2014】【HystrixBadRequestException】,和hystrix-javanica
子项目相关。
另外, #handleXXXX()
方法,整体代码比较类似,最终都是调用 #getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
2. handleFallback
在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(...)」 中, #executeCommandAndObserve(...)
的第 82 行 onErrorResumeNext(handleFallback)
代码,通过调用 Observable#onErrorResumeNext(...)
方法,实现【执行命令 Observable】执行异常时,返回【回退逻辑 Observable】,执行失败回退逻辑。
FROM 《ReactiveX文档中文翻译》「onErrorResumeNext」
onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable ,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的 Observable 。
Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))
handleFallback
变量,代码如下 :
1: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
2: @Override
3: public Observable<R> call(Throwable t) {
4: // 标记尝试成功
5: circuitBreaker.markNonSuccess();
6: // 标记 executionResult 执行异常
7: Exception e = getExceptionFromThrowable(t);
8: executionResult = executionResult.setExecutionException(e);
9: // 返回 【回退逻辑 Observable】
10: if (e instanceof RejectedExecutionException) { // 线程池提交任务拒绝异常
11: return handleThreadPoolRejectionViaFallback(e);
12: } else if (t instanceof HystrixTimeoutException) { // 执行命令超时异常
13: return handleTimeoutViaFallback();
14: } else if (t instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
15: return handleBadRequestByEmittingError(e);
16: } else {
17: /*
18: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
19: */
20: if (e instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
21: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
22: return Observable.error(e);
23: }
24:
25: return handleFailureViaFallback(e);
26: }
27: }
28: };
第 5 行 :标记断路器尝试成功。在 《Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker》 有详细解析。
第 7 至 8 行 :标记
executionResult
执行异常。第 10 至 11 行 :
thread-pool-rejection
,处理线程池提交任务拒绝的回退逻辑,在 「5. #handleThreadPoolRejectionViaFallback()」 详细解析。第 12 至 13 行 :
execution-timeout
,处理命令执行超时的回退逻辑,在 「6. #handleTimeoutViaFallback()」详细解析。第 14 至 23 行 :,
bad-request
,TODO 【2014】【HystrixBadRequestException】,和hystrix-javanica
子项目相关。第 25 行 :
execution-failure
处理命令执行异常的回退逻辑,在 「7. #handleFailureViaFallback()」 详细解析。
3. #handleShortCircuitViaFallback()
#handleShortCircuitViaFallback()
方法, short-circuit
,处理链路处于熔断的回退逻辑,在 此处 被调用,代码如下 :
1: private Observable<R> handleShortCircuitViaFallback() {
2: // TODO 【2011】【Hystrix 事件机制】
3: // record that we are returning a short-circuited fallback
4: eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
5: // 标记 executionResult 执行异常
6: // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
7: Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
8: executionResult = executionResult.setExecutionException(shortCircuitException);
9: try {
10: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
11: return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
12: "short-circuited", shortCircuitException);
13: } catch (Exception e) {
14: return Observable.error(e);
15: }
16: }
第 4 行 :TODO 【2011】【Hystrix 事件机制】
第 7 至 8 行 :标记
executionResult
执行异常。第 11 至 12 行 :调用
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。第 14 行 :返回【异常 Observable】。
4. #handleSemaphoreRejectionViaFallback()
#handleSemaphoreRejectionViaFallback()
方法, semaphore-rejection
,处理信号量获得失败的回退逻辑,在 此处 被调用,代码如下 :
1: private Observable<R> handleSemaphoreRejectionViaFallback() {
2: // 标记 executionResult 执行异常
3: Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
4: executionResult = executionResult.setExecutionException(semaphoreRejectionException);
5: // TODO 【2011】【Hystrix 事件机制】
6: eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
7: logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
8: // retrieve a fallback or throw an exception if no fallback available
9: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
10: return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
11: "could not acquire a semaphore for execution", semaphoreRejectionException);
12: }
第 3 至 4 行 :标记
executionResult
执行异常。第 6 至 7 行 :TODO 【2011】【Hystrix 事件机制】
第 10 至 11 行 :调用
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
5. #handleThreadPoolRejectionViaFallback()
#handleThreadPoolRejectionViaFallback()
方法, thread-pool-rejection
,处理线程池提交任务拒绝的回退逻辑,在 此处 被调用,代码如下:
1: private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
4: // TODO 【2002】【metrics】
5: threadPool.markThreadRejection();
6: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
7: // use a fallback instead (or throw exception if not implemented)
8: return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
9: "could not be queued for execution", underlying);
10: }
第 3 行 :TODO 【2011】【Hystrix 事件机制】
第 5 行 :TODO 【2002】【metrics】
第 8 至 9 行 :调用
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
6. #handleTimeoutViaFallback()
#handleTimeoutViaFallback()
方法, execution-timeout
,处理命令执行超时的回退逻辑,在 此处 被调用,代码如下:
1: private Observable<R> handleTimeoutViaFallback() {
2: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
3: return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
4: "timed-out", new TimeoutException());
5: }
第 3 至 4 行 :调用
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
7. #handleFailureViaFallback()
#handleFailureViaFallback()
方法, execution-failure
,处理命令执行异常的回退逻辑,在 此处 被调用,代码如下:
1: private Observable<R> handleFailureViaFallback(Exception underlying) {
2: // TODO 【2011】【Hystrix 事件机制】
3: /**
4: * All other error handling
5: */
6: logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
7:
8: // report failure
9: eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
10:
11: // 标记 executionResult 异常 TODO 【2007】【executionResult】用途 为啥不是执行异常
12: // record the exception
13: executionResult = executionResult.setException(underlying);
14: // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
15: return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
16: }
第 2 至 9 行 :TODO 【2011】【Hystrix 事件机制】
第 13 行 :标记
executionResult
异常。第 15 行 :调用
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
8. #getFallbackOrThrowException(...)
#getFallbackOrThrowException()
方法,获得【回退逻辑 Observable】或者【异常 Observable】,代码如下 :
1: private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
2: // 记录 HystrixRequestContext
3: final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
4: // 标记 executionResult 添加( 记录 )事件
5: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
6: // record the executionResult
7: // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
8: executionResult = executionResult.addEvent((int) latency, eventType);
9:
10: if (isUnrecoverable(originalException)) { // 无法恢复的异常
11: logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
12:
13: // TODO 【2003】【HOOK】
14: /* executionHook for all errors */
15: Exception e = wrapWithOnErrorHook(failureType, originalException);
16: // 返回 【异常 Observable】
17: return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
18: } else {
19: if (isRecoverableError(originalException)) { // 可恢复的异常
20: logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
21: }
22:
23: if (properties.fallbackEnabled().get()) {
24: /* fallback behavior is permitted so attempt */
25:
26: // 设置 HystrixRequestContext 的 Action
27: final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
28: @Override
29: public void call(Notification<? super R> rNotification) {
30: setRequestContextIfNeeded(requestContext);
31: }
32: };
33:
34: // TODO 【2007】【executionResult】用途
35: final Action1<R> markFallbackEmit = new Action1<R>() {
36: @Override
37: public void call(R r) {
38: if (shouldOutputOnNextEvents()) {
39: executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
40: eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
41: }
42: }
43: };
44:
45: // TODO 【2007】【executionResult】用途
46: final Action0 markFallbackCompleted = new Action0() {
47: @Override
48: public void call() {
49: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
50: eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
51: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
52: }
53: };
54:
55: // 处理异常 的 Func
56: final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
57: @Override
58: public Observable<R> call(Throwable t) {
59: // TODO 【2003】【HOOK】
60: /* executionHook for all errors */
61: Exception e = wrapWithOnErrorHook(failureType, originalException);
62: // 获得 Exception
63: Exception fe = getExceptionFromThrowable(t);
64:
65: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
66: Exception toEmit;
67:
68: if (fe instanceof UnsupportedOperationException) {
69: // TODO 【2011】【Hystrix 事件机制】
70: logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
71: eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
72: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_MISSING
73: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
74:
75: // 创建 HystrixRuntimeException
76: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
77: } else {
78: // TODO 【2011】【Hystrix 事件机制】
79: logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
80: eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
81: // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_FAILURE
82: executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
83:
84: // 创建 HystrixRuntimeException
85: toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
86: }
87:
88: // NOTE: we're suppressing fallback exception here
89: if (shouldNotBeWrapped(originalException)) {
90: return Observable.error(e);
91: }
92:
93: return Observable.error(toEmit);
94: }
95: };
96:
97: // 获得 TryableSemaphore
98: final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
99:
100: // 信号量释放Action
101: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
102: final Action0 singleSemaphoreRelease = new Action0() {
103: @Override
104: public void call() {
105: if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
106: fallbackSemaphore.release();
107: }
108: }
109: };
110:
111: Observable<R> fallbackExecutionChain;
112:
113: // acquire a permit
114: if (fallbackSemaphore.tryAcquire()) {
115: try {
116: if (isFallbackUserDefined()) {
117: executionHook.onFallbackStart(this);
118: fallbackExecutionChain = getFallbackObservable();
119: } else {
120: //same logic as above without the hook invocation
121: fallbackExecutionChain = getFallbackObservable();
122: }
123: } catch (Throwable ex) {
124: //If hook or user-fallback throws, then use that as the result of the fallback lookup
125: fallbackExecutionChain = Observable.error(ex);
126: }
127:
128: // 获得 【回退逻辑 Observable】
129: return fallbackExecutionChain
130: .doOnEach(setRequestContext)
131: .lift(new FallbackHookApplication(_cmd)) // TODO 【2003】【HOOK】
132: .lift(new DeprecatedOnFallbackHookApplication(_cmd))
133: .doOnNext(markFallbackEmit)
134: .doOnCompleted(markFallbackCompleted)
135: .onErrorResumeNext(handleFallbackError) //
136: .doOnTerminate(singleSemaphoreRelease)
137: .doOnUnsubscribe(singleSemaphoreRelease);
138: } else {
139: return handleFallbackRejectionByEmittingError();
140: }
141: } else {
142: return handleFallbackDisabledByEmittingError(originalException, failureType, message);
143: }
144: }
145: }
耐心,这个方法看起来灰常长,也仅限于长,理解成难度很小。
第 3 行 :记录 HystrixRequestContext 。
第 5 至 8 行 :标记
executionResult
添加( 记录 )事件。第 10 至 17 行 :调用
#isUnrecoverable(Exception)
方法,若异常不可恢复,直接返回【异常 Observable】。点击 链接 查看该方法。第 19 至 21 行 :调用
#isRecoverableError(Exception)
方法,若异常可恢复,打印WARN
日志。点击 链接 查看该方法。主要针对java.lang.Error
情况,打印#isUnrecoverable(Exception)
排除掉的 Error。【反向】第 141 至 143 行 :当配置
HystrixCommandProperties.fallbackEnabled=false
( 默认值 :true
) ,即失败回退功能关闭,调用#handleFallbackDisabledByEmittingError()
,返回【异常 Observable】。点击 链接 查看该方法。【反向】第 138 至 140 行 :失败回退信号量( TryableSemaphore )【注意,不是正常执行信号量】使用失败,调用
#handleFallbackRejectionByEmittingError()
,返回【异常 Observable】。点击 链接 查看该方法。第 23 行 :当配置
HystrixCommandProperties.fallbackEnabled=true
( 默认值 :true
) ,即失败回退功能开启。第 27 至 32 行 :设置 HystrixRequestContext 的 Action ,使用第 3 行记录的 HystrixRequestContext 。
第 35 至 43 行 :TODO 【2007】【executionResult】用途
第 46 至 53 行 :TODO 【2007】【executionResult】用途
第 56 至 95 行 :处理回退逻辑执行发生异常的 Func1 ,返回【异常 Observable】。
第 61 行 :TODO 【2003】【HOOK】
第 63 行 :调用
#getExceptionFromThrowable(Throwable)
方法,获得 Exception 。若t
的类型为 Throwable 时,包装成 Exception 。点击 链接 查看该方法代码。第 68 至 76 行 :当
fe
的类型为 UnsupportedOperationException 时,使用e
+fe
创建 HystrixRuntimeException 。该异常发生于HystrixCommand#getFallback()
抽象方法未被覆写。第 77 至 86 行 :当
fe
的类型为其他异常时,使用e
+fe
创建 HystrixRuntimeException 。该异常发生于HystrixCommand#getFallback()
执行发生异常。第 89 至 91 行 :调用
#shouldNotBeWrapped()
方法,判断originalException
是 ExceptionNotWrappedByHystrix 的实现时,即要求返回的【异常 Observable】不使用 HystrixRuntimeException 包装。点击 链接 查看该方法代码。第 93 行 :返回【异常 Observable】,使用
toEmit
( HystrixRuntimeException ) 为异常。第 98 行 :调用
#getFallbackSemaphore()
方法,获得失败回退信号量( TryableSemaphore )对象,点击 链接 查看该方法代码。TryableSemaphore 在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 有详细解析。第 100 至 109 行 :信号量释放的 Action。
第 114 至 137 行 :失败回退信号量( TryableSemaphore )使用成功,返回【回退逻辑 Observable】。
第 131 行 :// TODO 【2003】【HOOK】
第 135 行 :调用
Observable#onErrorResumeNext(...)
方法,实现【失败回退 Observable】执行异常时,返回【异常 Observable】。第 116 行 :调用
#isFallbackUserDefined()
方法,返回命令子类是否实现HystrixCommand#getFallback()
抽象方法。只有已实现(true
) 的情况下,调用 HOOK TODO 【2003】【HOOK】。【重要】第 116 至 122 行 :调用
#getFallbackObservable()
方法,创建【回退逻辑 Observable】。将子类对HystrixCommand#getFallback()
抽象方法的执行结果,使用Observable#just(...)
包装返回。点击 链接查看该方法的代码。第 129 至 137 行 :获得 【回退逻辑 Observable】。
有两个注意点:
当命令执行超时时,失败回退逻辑使用的是 HystrixTimer 的线程池。
失败回退逻辑,无超时时间,使用要小心。
666. 彩蛋
比想象中“臭长”的逻辑。
总的来说,逻辑和 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 是很类似的。
胖友,分享一波朋友圈可好!