查看原文
其他

Spring Cloud Stream知识点盘点

itmuch IT牧场 2021-08-10

点击上方"IT牧场",选择"设为星标"技术干货每日送达!

前面,已经探讨了:

Spring Cloud Stream实现消息过滤消费Spring Cloud Stream 错误处理详解

本文对Spring Cloud Stream,做一个知识点盘点和总结,包括:

•概念•Stream注解•Spring Cloud Integration(Spring Cloud Stream的底层)注解•Spring Messaging(Spring消息编程模型)注解•Spring Cloud Stream API

概念

group

组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。

组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group。

destination binder

与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。

destination binding

Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。

partition

TIPS

严格来说这个不是概念,而是一种Stream提高伸缩性、吞吐量的一种方式。不过不想另起标题了,写在这里吧。

一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。

注解

Input(Stream)

示例:

public interface Barista { @Input("inboundOrders") SubscribableChannel orders();}

作用:

•用于接收消息•为每个binding生成channel实例•指定channel名称•在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean•在spring容器中生成一个类,实现Barista接口。

Output(Stream)

示例:

public interface Source { @Output MessageChannel output();}

作用:

类似Input,只是用来生产消息。

StreamListener(Stream)

示例:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")public void handle(String body) { System.out.println("Received: " + body);}
@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("abcdef", map); };}

作用:

用于消费消息

condition的作用:符合条件,才进入处理方法。

condition起作用的两个条件:

•注解的方法没有返回值•方法是一个独立方法,不支持Reactive API

SendTo(messaging)

示例:

// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel@StreamListener(Sink.INPUT)@SendTo(Source.OUTPUT)public String receive(String receiveMsg) { return "handle...";}

作用:

用于发送消息

InboundChannelAdapter(Integration)

示例:

@Bean@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))public MessageSource<String> test() { return () -> new GenericMessage<>("Hello Spring Cloud Stream");}

作用:

表示让定义的方法生产消息。

注:用 InboundChannelAdapter 注解的方法上即使有参数也没用。即下面test方法不要有参数。

•fixedDelay:多少毫秒发送1次•maxMessagesPerPoll:一次发送几条消息。

ServiceActivator(Integration)

示例:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)public String transform(String payload) { return payload.toUpperCase();}

作用:

表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。

Transformer(Integration)

示例:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)public Object transform(String message) { return message.toUpperCase();}

作用:

和 ServiceActivator 类似,表示方法能够转换消息,消息头,或消息有效内容

PollableMessageSource(Stream)

示例代码:

@SpringBootApplication@EnableBinding({ConsumerApplication.PolledProcessor.class})@EnableSchedulingpublic class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
@Autowired private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }
public interface PolledProcessor { @Input PollableMessageSource input();
@Output MessageChannel output(); }
@Bean @InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("adfdfdsafdsfa", map); }; }}

如果不想自己做byte数组转换,可以添加配置:

spring: cloud: stream: bindings: output: # 指定content-type content-type: text/plain

作用:

允许消费者控制消费速率。

相关文章:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]

干货分享

最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取!

•001:《Java并发与高并发解决方案》学习笔记;•002:《深入JVM内核——原理、诊断与优化》学习笔记;•003:《Java面试宝典》•004:《Docker开源书》•005:《Kubernetes开源书》•006:《DDD速成(领域驱动设计速成)》•007:全部•008:加技术讨论群

近期热文

亚马逊实践领域驱动设计之道缓存使用过程中的几种策略总结及优缺点组合分析秒懂 QPS、TPS、PV、UV、GMV、IP、RPS!Spring Cloud Stream 错误处理详解多账户的统一登录 实现全过程Spring Cloud Stream实现消息过滤消费

References

[1]https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers


想知道更多?长按/扫码关注我吧↓↓↓>>>技术讨论群<<<喜欢就点个"在看"呗^_^

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存