Reactive 架构才是未来
阿里妹导读:Reactive 编程模型有哪些价值?它的原理是什么?如何正确使用?本文作者将根据他学习和使用的经历,分享 Reactive 的概念、规范、价值和原理。欢迎同学们共同探讨、斧正。
文末福利:Java 系列直播,畅谈架构、Reactive Spring、DDD 和高可用。
public static void main(String[] args) {
FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create();
publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe();
publisher.onNext(1); // print 'receive event: 1'
publisher.onNext(2); // print 'receive event: 2'
}
上面描述有很多专有名词,可能有些疑惑,可以看下相关名词解释。
为什么使用 Reactive 方式构建的系统会具有以上价值, 我稍后在 Reactor 章节中介绍。
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
Publisher
产生一个数据流(可能包含无限数据), Subscriber 们可以根据它们的需要消费这些数据。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
Publisher 创建的元素的接收者。监听指定的事件,例如 OnNext, OnComplete, OnError 等。
publicinterface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription
是 Publisher 和 Subscriber 一对一的协调对象。Subscriber 可以通过它来向 Publisher 取消数据发送或者 request 更多数据。
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor
同时具备 Publisher 和 Subscriber 特征。代码1中 FluxProcessor 既可以发送数据(OnNext),也可以接收数据 (doOnNext)。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
同步方式一般通过多线程来提高性能,但系统可创建的线程数是有限的,且线程多以后造成线程切换开销。
同步方式很难进一步提升资源利用率。
同步调用依赖的系统出现问题时,自身稳定性也会受到影响。
thread 不是非常轻量(相比下面几种实现方案)。
thread 数量是有限的,最终可能会成为主要瓶颈。
有一些平台可能不支持多线程。例如:JavaScript。
调试,实现上有一定复杂性。
多层嵌套 callback 比较复杂,容易形成"圣诞树" (callback hell)。
错误处理比较复杂。
多用于 event loop 架构的语言中,例如:JavaScript。
无法逻辑组合各种行为,支持业务场景有限。
错误处理依然复杂。
和 Future 很相似。Future 可以认为返回一个独立的元素,而 Rx 返回一个可以被订阅的 Stream。
多平台支持同一套规范。
同一套 API 同时支持异步、同步。
错误处理方便。
kotlin coroutine 和 goroutine 在语法层面上提供异步支持, 而且比Rx更简洁,但无法跨多个语言平台形成统一的规范。
回弹性 (Resilient):当函数出现严重超时时 (rt >= 10s),函数上游的 broker, gateway 应用几乎无任何影响。
及时响应性:不管是高并发场景(资源足够),还是正常场景,RT 表现一致。
涉及到 IO 的地方几乎全异步化。例如中间件(HSF, MetaQ 等提供异步 API)调用。
IO 线程模型变化。使用较少(一般 CPU 核数)线程处理所有的请求。
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda.
inChannel.read(in) {
workerThreadPool.execute{
// 阻塞处理业务逻辑(process), 业务逻辑在worker线程池中执行,同步执行完后,再向客户端返回输出(out)
val out = process(in)
outChannel.write(out)
}
}
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda
inChannel.read(in) {
// IO线程执行业务逻辑(process), 然后向客户端返回输出(out). 这要求业务处理流程必须是非阻塞的.
process(in){ out->
outChannel.write(out) {
// this lambda is executed when the writing completes
...
}
}
}
Reactor 基础文档
Reactive Streams 规范文档
Operator
参考
https://www.reactivemanifesto.org/
https://www.reactive-streams.org/
https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html
https://projectreactor.io/docs/core/release/reference/index.html
Java 系列直播
Reactive 将对架构设计以及开发方式带来什么样的改变?Java 下 DDD 应该怎么落地?Java 性能优化怎么做?全球顶尖 Java 开发者 & 布道师坐镇,聚焦前沿技术以及全新云原生开发方式,中国最大 Java 用户组联手 Spring 发起 Java 系列直播,与你畅谈。
识别下方二维码,或点击文末“阅读原文”立即收看:
推荐阅读