查看原文
其他

如何(不)在Java 9+中使用Reactive Streams

影宸风洛 SpringForAll社区 2020-10-17

简介

Reactive Streams是以无阻塞背压的流式方式进行异步数据处理的标准。从Java 9开始,它们在java.util.concurrent.Flow.*以接口的形式成为JDK的一部分。

拥有接口可能会使您倾向于编写自己的实现。这看起来很神奇,但这不并是他们存在JDK中的原因。

在本文中,我将描述反应流处理的基本概念,并说明如何不使用JDK 9+中包含的API。此外,我们将考虑未来JDK的Reactive Streams可能支持方向。

题外话

如果您更喜欢观看视频,那么 the third top-rated talk of Riga Dev Days 2016–18是关于这个主题的一次演讲的记录。否则,请继续阅读。

流处理回顾

在大多数人可能至少看过一次的通用流处理架构情况下,列举以下几个主要概念:

  • 数据来源,有时称为生产者(producer)

  • 数据的目的地,有时称为消费者(consumer)

  • 对数据执行某些操作一个或多个处理阶段。

在这样的管道中,数据从生产者开始流经处理阶段最终流向消费者:

  1. 广义流处理架构

现在,如果您认为上面的组件可以有不同的处理速度,那么可能有以下两种情况:

  1. 如果下游(即接收数据的组件)比上游(发送数据的组件)快,那么一切都很好,因为管道应该能够顺利运行。

  2. 然而,如果上游速度更快,那么下游就会数据堆积,事情开始变得糟糕。

在后一种情况下,有几种策略可以处理过多的数据:

  1. 缓冲  — 但是缓冲区的容量有限,你迟早会耗尽内存

  2. 删除  — 但是你会丢失数据(通常不需要,但在某些情况下可能有意义 - 例如,这就是网络硬件经常做的事情)

  3. 阻塞直至消费者完成 - 但这可能会导致整个管道变慢。

处理这些不同处理能力的首选方法是一种称为背压的技术 - 归结为较慢的消费者从较快的生产者请求给定数量的数据 - 但只是消费者能够在那时处理的数量。

回到流式传输流水线图,您可以将背压视为一种特殊的信号数据流向相反的方向(与正在处理的常规数据相比:

  1. 背压流式管道

然而,并非每个背压的流式管道都必须是反应式的。

响应式流

Reactive Streams的关键概念是以异步和非阻塞方式无限处理数据流,以便可以并行使用计算资源(可以考虑CPU核心或网络主机)。

响应式流的是三个关键要素:

  • 异步处理数据

  • 非阻塞背压机制

  • 下游可以比上游慢的事实以某种方式在域模型中表示

最后一个要素示例包括Twitter straming API,如果消耗太慢,可以断开连接,或者Akka Streams中的一个内置阶段 - conflate - 这可以让您明确规划缓慢的下游。

JDK支持的响应流

从版本9开始,Reactive Streams接口(以前作为单独的库提供)已成为java.util.concurrent.Flow类中JDK的一部分。

这四个接口乍一看似乎相当简单:

  • Publisher: 负责发布类型为T的元素,并为订阅者提供连接到它的订阅方法

  • Subscriber:连接Publisher,通过onSubscribe接收确认,然后通过onNext接收数据,通过onError和onComplete回调和返回其他信号

  • Subscription:代表发布者和订阅者之间的链接,允许通过请求对发布者进行背压,或者取消终止链接

  • Processor:在单个接口中结合了Publisher和Subscriber的功能

编码

手头有这么简单的接口可能会诱使你尝试实现它们。例如,您可以编写一个简单的Publisher实现,它发布一个任意的整数迭代器:

  1. public class SimplePublisher implements Flow.Publisher<Integer> {

  2. private final Iterator<Integer> iterator;

  3. SimplePublisher(int count) {

  4. this.iterator = IntStream.rangeClosed(1, count).iterator();

  5. }

  6. @Override

  7. public void subscribe(Flow.Subscriber<? super Integer> subscriber) {

  8. iterator.forEachRemaining(subscriber::onNext);

  9. subscriber.onComplete();

  10. }

  11. }

然后,您可以尝试使用一些只打印出接收数据的虚拟订阅者来运行它:

  1. public static void main(String[] args) {

  2. new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {

  3. @Override

  4. public void onSubscribe(Flow.Subscription subscription) {}

  5. @Override

  6. public void onNext(Integer item) {

  7. System.out.println("item = [" + item + "]");

  8. }

  9. @Override

  10. public void onError(Throwable throwable) {}

  11. @Override

  12. public void onComplete() {

  13. System.out.println("complete");

  14. }

  15. });

  16. }

运行并检查输出,它应该产生如下结果:

  1. item = [1]

  2. item = [2]

  3. item = [3]

  4. item = [4]

  5. item = [5]

  6. item = [6]

  7. item = [7]

  8. item = [8]

  9. item = [9]

  10. item = [10]

  11. complete

所以,它有效,对吗?看起来确实如此,但你可能会感到有些东西缺失了。例如,发布者不会根据任何需求发出元素,而只是一次性向下游发送元素。

事实证明这种天真的实现远非正确。这可以通过从Reactive Streams TCK运行几个测试来实现。 TCK(或技术兼容性工具包)仅是一个测试框架,用于验证响应式组件的实现在组件之间是否正确交互方面是否正确。其目标是确保所有自定义Reactive Streams实现可以顺利地协同工作 - 通过抽象接口连接 - 同时正确执行所有数据传输,信令和背压。

SimplePublisher创建测试用例,您需要为构建定义添加适当的依赖项并扩展TCK的FlowPublisherVerification

  1. public class SimplePublisherTest extends FlowPublisherVerification<Integer> {

  2. public SimplePublisherTest() {

  3. super(new TestEnvironment());

  4. }

  5. @Override

  6. public Flow.Publisher<Integer> createFlowPublisher(long elements) {

  7. return new SimplePublisher((int) elements);

  8. }

  9. @Override

  10. public Flow.Publisher<Integer> createFailedFlowPublisher() {

  11. return null;

  12. }

  13. }

在为简单的发布者运行测试用例之后,您可以看到它确实存在一些问题:

运行TCK发布者测试SimplePublisher的结果

实际上,只有一个测试用例已经通过;其他所有都有问题。这清楚地表明,无价值的实现并不合适。

测试用例名称中的数字指的是Reactive Streams规范中的相应项目,您可以进一步探索这些要求背后的想法。

事实证明,大多数问题可以通过几个小的改变来消除,即:

  • 引入Subscription的实现以将发布者与其订阅者链接起来,订阅者将根据需求发出元素

  • 添加一些基本的错误处理

  • 在订阅中添加一些简单状态以正确处理终止。

有关详细信息,请查阅示例代码存储库中提交的历史记录。

然而,最终,你将会遇到不那么琐碎,难以解决的问题。

由于实现是同步的,因此订阅的request()调用订阅者的onNext()会产生无限递归的问题,其中订阅者再次调用request(),等等。

另一个严重的问题与处理无限需求有关(即订阅者请求Long.MAX_VALUE元素,可能是几次)。如果你在这里不够谨慎,你最终可能会产生太多的线程或者溢出一些长值,你可能会存储累积的需求。

不要在家里尝试这个

上面示例的底线是,响应式组件实际上并不容易实现。因此,除非您正在编写另一个Reactive Streams实现,否则您不应该自己实现它们,而应使用通过TCK验证的现有实现。

如果您决定编写自己的实现,请务必了解规范的所有细节,并记住针对您的代码运行TCK。

新接口的目的

那么,你可能会问自己,有什么接口?将它们包含在JDK中的实际目标是提供称为服务提供者接口(或SPI)层的东西。这最终应该作为具有反应性和流式特性的不同组件的统一层,但可能会暴露自己的自定义API,因此无法与其他类似的实现进行互操作。

另一个同样重要的目标是为JDK的未来发展指出正确的方向,从而导致已经存在于JDK中并广泛使用的现有流抽象使用一些通用接口 - 再一次改善互操作性。

现有的流式抽象

那么JDK中已经存在流式抽象(流媒体意味着处理大量,可能无限的数据块,而无需预先将所有内容读入内存)?其中包括:

  • java.io.InputStream/OutputStream

  • java.util.Iterator

  • java.nio.channels.*

  • javax.servlet.ReadListener/WriteListener

  • java.sql.ResultSet

  • java.util.Stream

  • java.util.concurrent.Flow.*

虽然上述所有抽象都暴露了某种类似流式的行为,但它们错过了一个可以让您轻松连接它们的通用API,例如:使用Publisher从一个文件和订阅者读取数据以将其写入另一个文件。

拥有这样的统一层的优点是可以使用单个调用:

  1. publisher.subscribe(subscriber)

使用它来处理反应流处理的所有隐藏的复杂性(如背压和信号)。

迈向理想世界

各种抽象使用通用接口的结果可能是什么?让我们来看几个例子。

最小操作集

当前JDK中Reactive Streams仅限于支持前面描述的四个接口。如果您以前曾经使用过一些反应库--Akka Streams,RxJava或Project Reactor - 您就会发现它们的功能在于各种流组合器(如mapfilter,来命名最简单的组合),开箱即用。然而,JDK中缺少这些组合器,尽管您可能期望至少有一些组合器可用。

为了解决这个问题,Lightbend提出了一个POC of Reactive Streams Utilities - 一个内置基本操作的库,可以提供更复杂的库,作为委托给现有实现的插件,由JVM系统参数指定像:

  1. -Djava.flow.provider=akka

HTTP

当然,我们如何收到通过HTTP上传的文件并以其他方式上传到其他地方?

从Servlet 3.1版开始,就有了异步Servlet IO。此外,从JDK 9开始,有一个新的HTTP客户端(在Java 9/10的jdk.incubating.httpmodule中,但从Java 11开始被认为是稳定的)。除了更好的API,新客户端还支持Reactive Streams作为输入/输出。其中,它提供POST(Publisher)方法。

现在,如果HttpServletRequest提供了一个发布者来公开请求体,那么上传接收到的文件将变为

  1. POST(BodyPublisher.fromPublisher(req.getPublisher())

这种情况发生在post请求下的所有响应特征 - 只需使用该单行代码即可。

Database Access

当谈到以被动方式访问关系数据库的通用方法时,异步数据库访问API(ADBA)带来了一些希望,遗憾的是,到目前为止它尚未进入JDK。

还有R2DBC--努力将响应式编程API引入关系数据存储。它目前支持H2和Postgres,并与Spring Data JPA很好地配合,这可能有助于更广泛的应用。

然后,也有一些特定于供应商的异步驱动程序。但我们仍然缺少一个完美的解决方案,可以让你做一些事情像:

  1. Publisher<User> users = entityManager

  2. .createQuery("select u from users")

  3. .getResultPublisher()

这基本上是一个普通的旧JPA调用,只是用户的发布者而不是列表。

这仍然不是现实

再次提醒你 - 通过上面的例子可以展望未来。然而他们还没到。 JDK生态系统前进的方向需要社区的时间和努力。

统一层的实际使用

尽管HTTP和数据库的统一还不存在,但已经可以使用JDK中的统一接口实际连接各种Reactive Streams实现。

在这个例子中,我将使用Project Reactor的Flux作为发布者,使用Akka Streams的Flow作为处理器,使用RXJava作为订阅者。注意:下面的示例代码使用Java 10 var s,因此如果您打算自己尝试,请确保使用正确的JDK。

  1. public class IntegrationApp {

  2. public static void main(String[] args) {

  3. var reactorPublisher = reactorPublisher();

  4. var akkaStreamsProcessor = akkaStreamsProcessor();

  5. reactorPublisher.subscribe(akkaStreamsProcessor);

  6. Flowable

  7. .fromPublisher(FlowAdapters.toProcessor(akkaStreamsProcessor))

  8. .subscribe(System.out::println);

  9. }

  10. private static Publisher<Long> reactorPublisher() {

  11. var numberFlux = Flux.interval(Duration.ofSeconds(1));

  12. return JdkFlowAdapter.publisherToFlowPublisher(numberFlux);

  13. }

  14. private static Processor<Long, Long> akkaStreamsProcessor() {

  15. var negatingFlow = Flow.of(Long.class).map(i -> -i);

  16. return JavaFlowSupport.Flow.toProcessor(negatingFlow).run(materializer);

  17. }

  18. private static ActorSystem actorSystem = ActorSystem.create();

  19. private static ActorMaterializer materializer = ActorMaterializer.create(actorSystem);

  20. }

查看main,您可以看到构成管道的组件有三个:reactorPublisher,akkaStreamsProcessorFlowable,它们打印到标准输出。

当您查看工厂方法的返回类型时,您会注意到它们只不过是常见的Reactive Streams接口(PublisherProcessor ),它们用于无缝连接不同的实现。

此外,正如您所看到的,各种库不会立即返回统一类型(即它们内部使用不同的类型层次结构),但它们需要一些粘合代码,将其内部类型转换为java.util.concurrent.Flow.的类型 - 类似于JdkFlowAdapterJavaFlowSupport*。

最后但并非最不重要的是,您可以发现不同库之间在如何公开流引擎内部方面的一些差异。虽然Project Reactor倾向于完全隐藏内部,但另一方面,Akka Streams要求您明确定义一个具体实现 - 流式传输管道的运行时。

总结

以下是本文的几个关键要点:

  • JDK中的Reactive Streams支持不是规范的完整实现,而只是通用接口,

  • 这些接口用作SPI(服务提供者接口) - 用于不同Reactive Streams实现的统一层,

  • 除非你正在创建一些新的库,否则你自己很难实现这些接口,也不推荐。如果您决定实现它们,请确保TCK的所有测试都是绿色的 - 这很有可能使您的库与其他反应组件一起顺利运行。

如果您想试验TCK和SimplePublisher示例,可以在GitHub上找到该代码。

如果您对深入了解Reactive Streams实现感兴趣,诚挚推荐Advanced Reactive Java博客和SoftwareMill Tech博客来获取更多这样的帖子。

原文链接:https://dzone.com/articles/how-not-to-use-reactive-streams-in-java-9

作者:Jacek Kunicki

译者:Emma


推荐: Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现

上一篇:Java中的控制(耦合)反转




 关注公众号

点击原文阅读更多



    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存