【SFA官方翻译】:Spring集成Java DSL
原文链接:https://www.baeldung.com/spring-integration-java-dsl
作者:Dhawal Kapil
译者:Emma
1 简介
在本篇文章中,我们将了解用于创建应用程序集成的Spring Integration Java DSL。
我们将采用我们在Spring Integration简介中构建的文件移动集成,而不是使用DSL。
2 依赖jar
Spring Integration Java DSL是Spring Integration Core的一部分。
所以,我们添加依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
为了让文件移动应用程序能正常运行,我们还需要Spring Integration File:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
3 Spring集成Java DSL
在Java DSL之前,用户将使用XML配置Spring Integration组件。
DSL引入了一些流畅的构建器,我们可以轻松地创建完全Java的Spring Integration管道。
所以,假设我们想要创建一个通道来对通过管道传输的任何数据进行大写.
在过去,我们是这样做的:
<int:channel id="input"/>
<int:transformer input-channel="input" expression="payload.toUpperCase()" />
而现在我们可以这样做:
@Bean
public IntegrationFlow upcaseFlow() {
return IntegrationFlows.from("input")
.transform(String::toUpperCase)
.get();
}
4 文件移动App
文件移动集成开始之前,我们还需要一些简单的组件。
4.1 集成流
我们需要的第一个组件是集成流(integration flow),我们可以从IntegrationFlows构建器获取:
IntegrationFlows.from(...)
from可以采用多种类型,但在本教程中,我们将只看这三个:
MessageSources
MessageChannels, and
Strings
我们很快就会讨论这三种类型。
在我们调用from之后,我们可以使用一些自定义方法:
IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory())
// add more components
.get();
最终,IntegrationFlows将始终生成IntegrationFlow的实例,这是任何Spring Integration应用程序的最终产品。
这种获取输入,执行适当的转换以及发布结果的模式是所有Spring Integration应用程序的基础。
4.2 描述输入源
首先,我们需要向我们的集成流指出应该在哪里查找移动的文件内容,为此,我们需要一个MessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
// .. create a message source
}
简而言之,MessageSource是一个接收应用程序外部消息的地方。
更具体地说,我们需要能够将外部源调整为Spring消息传递表示的东西。由于这种调整主要集中在输入上,因此通常称为输入通道适配器( Input Channel Adapters)。
spring-integration-file依赖为我们提供了一个输入通道适配器,它非常适合我们的用例:FileReadingMessageSource:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}
这儿,FileReadingMessageSource将读取INPUT_DIR给出的目录,并将从中创建一个MessageSource。
让我们在IntegrationFlows.from调用中指定它作为我们的源:
IntegrationFlows.from(sourceDirectory());
4.3 配置输入来源
现在,如果我们将此视为一个长期存在的应用程序,我们可能希望能够在文件进入时注意到这些文件,而不仅仅是移动启动时已存在的文件。
为了方便这一点,from还可以将额外的配置器作为进一步定制的输入源:
IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));
在这种情况下,我们可以告诉Spring Integration在这种情况下每10秒轮询一次源 - 我们的文件系统,从而使我们的输入源更具灵活性。
当然,这不仅仅适用于我们的文件输入源,我们可以将此轮询器添加到任何MessageSource。
4.4 过滤输入源消息
接下来,我们假设希望文件移动应用程序仅移动特定文件,例如具有jpg扩展名的图像文件。
为此,我们可以使用GenericSelector:
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName().endsWith(".jpg");
}
};
}
那么,让我们再次更新集成流:
IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs());
或者,因为这个过滤器非常简单,我们可以使用lambda来定义它:
IntegrationFlows.from(sourceDirectory())
.filter(source -> ((File) source).getName().endsWith(".jpg"));
4.5 使用服务激活器处理消息
现在我们有一个已过滤的文件列表,我们需要将它们写入新位置。
当我们考虑Spring Integration中的输出时,我们会转向Service Activators。
我们使用spring-integration-file中的FileWritingMessageHandler服务激活器:
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
在这里,FileWritingMessageHandler会将它收到的每个Message有效负载写入OUTPUT_DIR。
我们又再次更新:
IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.handle(targetDirectory());
顺便提一下,请注意setExpectReply的用法。由于集成流可以是双向的,因此调用表明此特定管道是单向管道。
4.6 激活集成流
当我们添加了所有组件时,我们需要将IntegrationFlow注册为bean来激活它:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
get方法获得需要注册为Spring Bean的IntegrationFlow实例。
应用程序上下文加载后,IntegrationFlow中包含的所有组件都会被激活。
现在,应用程序开始将文件从源目录移动到目标目录。
5 附加组件
在我们基于DSL的文件移动应用程序中,我们创建了入站通道适配器(Inbound Channel Adapter),消息过滤器(Message Filter)和服务激活器(Service Activator)。
让我们看一些其他常见的Spring Integration组件,看看我们如何使用它们。
5.1 消息通道
如前所述,Message Channel是初始化流的另一种方法:
IntegrationFlows.from("anyChannel")
我们可以将其视为“请查找或创建名为anyChannel的通道bean。然后,从其他流中读取任何传送到anyChannel的数据。“
但是,它确实更具通用性。
简而言之,通道将生产者从消费者中抽象出来,我们可以将其视为Java Queue。通道可以在流中的任何点插入。
例如,假设我们希望在文件从一个目录移动到下一个目录时对其进行优先级排序:
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) ->
((File)left.getPayload()).getName().compareTo(
((File)right.getPayload()).getName()));
}
然后,我们可以在流之间插入一个调用通道:
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
有许多渠道可供选择,其中一些更方便的是用于并发,审计或中间持久性(想想Kafka或JMS缓冲区)。
此外,通道与桥接结合使用时,会变得很强大。
5.2 网桥
当我们想要组合两个通道时,我们使用Bridge。
让我们想象一下,我们不是直接写入输出目录,而是让我们的文件移动应用程序写入另一个通道:
@Bean
public IntegrationFlow fileReader() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
现在,因为我们只是将它写入一个通道,我们可以从那里桥接到其他流。
让我们创建一个网桥,轮询我们的存储器以获取消息并将它们写入目的地:
@Bean
public IntegrationFlow fileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.handle(targetDirectory())
.get();
}
同样,因为我们写了一个中间通道,现在我们可以添加另一个流,它获取这些相同的文件并以不同的速率写入它们:
@Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
.handle(anotherTargetDirectory())
.get();
}
我们可以看到,各个网桥可以控制不同处理程序的轮询配置。
应用程序上下文加载后,会有一个更复杂的应用程序,它开始将文件从源目录移动到两个目标目录。
6 总结
在本文中,我们看到了使用Spring Integration Java DSL构建不同集成管道的各种方法。
实质上,我们能用之前的教程重新创建文件移动应用程序,但这次使用的是纯Java。
此外,我们还看了一些其他组件,如通道和网桥。
Github上提供了本教程中使用的完整源代码。