如何(不)在Java 9+中使用Reactive Streams
简介
Reactive Streams是以无阻塞背压的流式方式进行异步数据处理的标准。从Java 9开始,它们在java.util.concurrent.Flow.*以接口的形式成为JDK的一部分。
拥有接口可能会使您倾向于编写自己的实现。这看起来很神奇,但这不并是他们存在JDK中的原因。
在本文中,我将描述反应流处理的基本概念,并说明如何不使用JDK 9+中包含的API。此外,我们将考虑未来JDK的Reactive Streams可能支持方向。
题外话
如果您更喜欢观看视频,那么 the third top-rated talk of Riga Dev Days 2016–18是关于这个主题的一次演讲的记录。否则,请继续阅读。
流处理回顾
在大多数人可能至少看过一次的通用流处理架构情况下,列举以下几个主要概念:
数据来源,有时称为生产者(producer)
数据的目的地,有时称为消费者(consumer)
对数据执行某些操作一个或多个处理阶段。
在这样的管道中,数据从生产者开始流经处理阶段最终流向消费者:
广义流处理架构
现在,如果您认为上面的组件可以有不同的处理速度,那么可能有以下两种情况:
如果下游(即接收数据的组件)比上游(发送数据的组件)快,那么一切都很好,因为管道应该能够顺利运行。
然而,如果上游速度更快,那么下游就会数据堆积,事情开始变得糟糕。
在后一种情况下,有几种策略可以处理过多的数据:
缓冲 — 但是缓冲区的容量有限,你迟早会耗尽内存
删除 — 但是你会丢失数据(通常不需要,但在某些情况下可能有意义 - 例如,这就是网络硬件经常做的事情)
阻塞直至消费者完成 - 但这可能会导致整个管道变慢。
处理这些不同处理能力的首选方法是一种称为背压的技术 - 归结为较慢的消费者从较快的生产者请求给定数量的数据 - 但只是消费者能够在那时处理的数量。
回到流式传输流水线图,您可以将背压视为一种特殊的信号数据流向相反的方向(与正在处理的常规数据相比:
背压流式管道
然而,并非每个背压的流式管道都必须是反应式的。
响应式流
Reactive Streams的关键概念是以异步和非阻塞方式无限处理数据流,以便可以并行使用计算资源(可以考虑CPU核心或网络主机)。
响应式流的是三个关键要素:
异步处理数据
非阻塞背压机制
下游可以比上游慢的事实以某种方式在域模型中表示
最后一个要素示例包括Twitter straming API,如果消耗太慢,可以断开连接,或者Akka Streams中的一个内置阶段 - conflate - 这可以让您明确规划缓慢的下游。
JDK支持的响应流
从版本9开始,Reactive Streams接口(以前作为单独的库提供)已成为java.util.concurrent.Flow类中JDK的一部分。
这四个接口乍一看似乎相当简单:
Publisher: 负责发布类型为T的元素,并为订阅者提供连接到它的订阅方法
Subscriber:连接Publisher,通过onSubscribe接收确认,然后通过onNext接收数据,通过onError和onComplete回调和返回其他信号
Subscription:代表发布者和订阅者之间的链接,允许通过请求对发布者进行背压,或者取消终止链接
Processor:在单个接口中结合了Publisher和Subscriber的功能
编码
手头有这么简单的接口可能会诱使你尝试实现它们。例如,您可以编写一个简单的Publisher实现,它发布一个任意的整数迭代器:
public class SimplePublisher implements Flow.Publisher<Integer> {
private final Iterator<Integer> iterator;
SimplePublisher(int count) {
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
iterator.forEachRemaining(subscriber::onNext);
subscriber.onComplete();
}
}
然后,您可以尝试使用一些只打印出接收数据的虚拟订阅者来运行它:
public static void main(String[] args) {
new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
运行并检查输出,它应该产生如下结果:
item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete
所以,它有效,对吗?看起来确实如此,但你可能会感到有些东西缺失了。例如,发布者不会根据任何需求发出元素,而只是一次性向下游发送元素。
事实证明这种天真的实现远非正确。这可以通过从Reactive Streams TCK运行几个测试来实现。 TCK(或技术兼容性工具包)仅是一个测试框架,用于验证响应式组件的实现在组件之间是否正确交互方面是否正确。其目标是确保所有自定义Reactive Streams实现可以顺利地协同工作 - 通过抽象接口连接 - 同时正确执行所有数据传输,信令和背压。
为SimplePublisher创建测试用例,您需要为构建定义添加适当的依赖项并扩展TCK的FlowPublisherVerification:
public class SimplePublisherTest extends FlowPublisherVerification<Integer> {
public SimplePublisherTest() {
super(new TestEnvironment());
}
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return new SimplePublisher((int) elements);
}
@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}
}
在为简单的发布者运行测试用例之后,您可以看到它确实存在一些问题:
运行TCK发布者测试SimplePublisher的结果
实际上,只有一个测试用例已经通过;其他所有都有问题。这清楚地表明,无价值的实现并不合适。
测试用例名称中的数字指的是Reactive Streams规范中的相应项目,您可以进一步探索这些要求背后的想法。
事实证明,大多数问题可以通过几个小的改变来消除,即:
引入Subscription的实现以将发布者与其订阅者链接起来,订阅者将根据需求发出元素
添加一些基本的错误处理
在订阅中添加一些简单状态以正确处理终止。
有关详细信息,请查阅示例代码存储库中提交的历史记录。
然而,最终,你将会遇到不那么琐碎,难以解决的问题。
由于实现是同步的,因此订阅的request()调用订阅者的onNext()会产生无限递归的问题,其中订阅者再次调用request(),等等。
另一个严重的问题与处理无限需求有关(即订阅者请求Long.MAX_VALUE元素,可能是几次)。如果你在这里不够谨慎,你最终可能会产生太多的线程或者溢出一些长值,你可能会存储累积的需求。
不要在家里尝试这个
上面示例的底线是,响应式组件实际上并不容易实现。因此,除非您正在编写另一个Reactive Streams实现,否则您不应该自己实现它们,而应使用通过TCK验证的现有实现。
如果您决定编写自己的实现,请务必了解规范的所有细节,并记住针对您的代码运行TCK。
新接口的目的
那么,你可能会问自己,有什么接口?将它们包含在JDK中的实际目标是提供称为服务提供者接口(或SPI)层的东西。这最终应该作为具有反应性和流式特性的不同组件的统一层,但可能会暴露自己的自定义API,因此无法与其他类似的实现进行互操作。
另一个同样重要的目标是为JDK的未来发展指出正确的方向,从而导致已经存在于JDK中并广泛使用的现有流抽象使用一些通用接口 - 再一次改善互操作性。
现有的流式抽象
那么JDK中已经存在流式抽象(流媒体意味着处理大量,可能无限的数据块,而无需预先将所有内容读入内存)?其中包括:
java.io.InputStream/OutputStream
java.util.Iterator
java.nio.channels.*
javax.servlet.ReadListener/WriteListener
java.sql.ResultSet
java.util.Stream
java.util.concurrent.Flow.*
虽然上述所有抽象都暴露了某种类似流式的行为,但它们错过了一个可以让您轻松连接它们的通用API,例如:使用Publisher从一个文件和订阅者读取数据以将其写入另一个文件。
拥有这样的统一层的优点是可以使用单个调用:
publisher.subscribe(subscriber)
使用它来处理反应流处理的所有隐藏的复杂性(如背压和信号)。
迈向理想世界
各种抽象使用通用接口的结果可能是什么?让我们来看几个例子。
最小操作集
当前JDK中Reactive Streams仅限于支持前面描述的四个接口。如果您以前曾经使用过一些反应库--Akka Streams,RxJava或Project Reactor - 您就会发现它们的功能在于各种流组合器(如map或filter,来命名最简单的组合),开箱即用。然而,JDK中缺少这些组合器,尽管您可能期望至少有一些组合器可用。
为了解决这个问题,Lightbend提出了一个POC of Reactive Streams Utilities - 一个内置基本操作的库,可以提供更复杂的库,作为委托给现有实现的插件,由JVM系统参数指定像:
-Djava.flow.provider=akka
HTTP
当然,我们如何收到通过HTTP上传的文件并以其他方式上传到其他地方?
从Servlet 3.1版开始,就有了异步Servlet IO。此外,从JDK 9开始,有一个新的HTTP客户端(在Java 9/10的jdk.incubating.httpmodule中,但从Java 11开始被认为是稳定的)。除了更好的API,新客户端还支持Reactive Streams作为输入/输出。其中,它提供POST(Publisher)方法。
现在,如果HttpServletRequest提供了一个发布者来公开请求体,那么上传接收到的文件将变为
POST(BodyPublisher.fromPublisher(req.getPublisher())
这种情况发生在post请求下的所有响应特征 - 只需使用该单行代码即可。
Database Access
当谈到以被动方式访问关系数据库的通用方法时,异步数据库访问API(ADBA)带来了一些希望,遗憾的是,到目前为止它尚未进入JDK。
还有R2DBC--努力将响应式编程API引入关系数据存储。它目前支持H2和Postgres,并与Spring Data JPA很好地配合,这可能有助于更广泛的应用。
然后,也有一些特定于供应商的异步驱动程序。但我们仍然缺少一个完美的解决方案,可以让你做一些事情像:
Publisher<User> users = entityManager
.createQuery("select u from users")
.getResultPublisher()
这基本上是一个普通的旧JPA调用,只是用户的发布者而不是列表。
这仍然不是现实
再次提醒你 - 通过上面的例子可以展望未来。然而他们还没到。 JDK生态系统前进的方向需要社区的时间和努力。
统一层的实际使用
尽管HTTP和数据库的统一还不存在,但已经可以使用JDK中的统一接口实际连接各种Reactive Streams实现。
在这个例子中,我将使用Project Reactor的Flux作为发布者,使用Akka Streams的Flow作为处理器,使用RXJava作为订阅者。注意:下面的示例代码使用Java 10 var s,因此如果您打算自己尝试,请确保使用正确的JDK。
public class IntegrationApp {
public static void main(String[] args) {
var reactorPublisher = reactorPublisher();
var akkaStreamsProcessor = akkaStreamsProcessor();
reactorPublisher.subscribe(akkaStreamsProcessor);
Flowable
.fromPublisher(FlowAdapters.toProcessor(akkaStreamsProcessor))
.subscribe(System.out::println);
}
private static Publisher<Long> reactorPublisher() {
var numberFlux = Flux.interval(Duration.ofSeconds(1));
return JdkFlowAdapter.publisherToFlowPublisher(numberFlux);
}
private static Processor<Long, Long> akkaStreamsProcessor() {
var negatingFlow = Flow.of(Long.class).map(i -> -i);
return JavaFlowSupport.Flow.toProcessor(negatingFlow).run(materializer);
}
private static ActorSystem actorSystem = ActorSystem.create();
private static ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
}
查看main,您可以看到构成管道的组件有三个:reactorPublisher,akkaStreamsProcessor和Flowable,它们打印到标准输出。
当您查看工厂方法的返回类型时,您会注意到它们只不过是常见的Reactive Streams接口(Publisher 和Processor ),它们用于无缝连接不同的实现。
此外,正如您所看到的,各种库不会立即返回统一类型(即它们内部使用不同的类型层次结构),但它们需要一些粘合代码,将其内部类型转换为java.util.concurrent.Flow.的类型 - 类似于JdkFlowAdapter或JavaFlowSupport*。
最后但并非最不重要的是,您可以发现不同库之间在如何公开流引擎内部方面的一些差异。虽然Project Reactor倾向于完全隐藏内部,但另一方面,Akka Streams要求您明确定义一个具体实现 - 流式传输管道的运行时。
总结
以下是本文的几个关键要点:
JDK中的Reactive Streams支持不是规范的完整实现,而只是通用接口,
这些接口用作SPI(服务提供者接口) - 用于不同Reactive Streams实现的统一层,
除非你正在创建一些新的库,否则你自己很难实现这些接口,也不推荐。如果您决定实现它们,请确保TCK的所有测试都是绿色的 - 这很有可能使您的库与其他反应组件一起顺利运行。
如果您想试验TCK和SimplePublisher示例,可以在GitHub上找到该代码。
如果您对深入了解Reactive Streams实现感兴趣,诚挚推荐Advanced Reactive Java博客和SoftwareMill Tech博客来获取更多这样的帖子。
原文链接:https://dzone.com/articles/how-not-to-use-reactive-streams-in-java-9
作者:Jacek Kunicki
译者:Emma
推荐: Spring Cloud Alibaba基础教程:使用Nacos实现服务注册与发现
上一篇:Java中的控制(耦合)反转
关注公众号
点击原文阅读更多