熔断器 Hystrix 源码解析 —— 执行命令方式
本文主要基于 Hystrix 1.5.X 版本
1. 概述
2. 实现
3. BlockingObservable
666. 彩蛋
本文主要分享 Hystrix 执行命令方法。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:
方法 | |
#execute() | 同步调用,返回直接结果 |
#queue() | 异步调用,返回 java.util.concurrent.Future |
#observe() | 异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
#toObservable() | 未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
| |
2. 实现
// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Observable<R> toObservable() {
return Observable.defer(new Func0<Observable<R>>() {
public Observable<R> call() {
// ....
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
public void call() {
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// ... 省略无关属性与方法
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// ... 包装 delegate
// ...
return f;
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
protected abstract R run() throws Exception;
方法 :未做订阅,返回干净的 Observable 。这就是为什么上文说“未调用” 。#observe()
方法 :调用#toObservable()
方法的基础上,向 Observable 注册rx.subjects.ReplaySubject
发起订阅 。ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。
方法 :调用#toObservable()
方法 :子类实现该方法,执行正常的业务逻辑。Observable#toBlocking()
方法 :将 Observable 转换成阻塞的rx.observables.BlockingObservable
方法 :返回可获得#run()
抽象方法执行结果的 Future 。BlockingObservable 在 「3. BlockingObservable」 详细解析。
方法 :调用#queue()
FROM 《【翻译】Hystrix文档-实现原理》
3. BlockingObservable
本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable
1 ) 跳过本小节,不影响对本文的理解。
2 ) 选择阅读 《ReactiveX/RxJava文档中文版 —— 阻塞操作》 ,理解 BlockingObservable 的原理。
3 ) 选择阅读本小节,理解 BlockingObservable 的原理以及实现。
《RxJava 源码解析 —— BlockingObservable》
666. 彩蛋
第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。