查看原文
其他

Spring Cloud Stream使用细节

悟空 牧码小子 2019-04-03

上篇文章我们看了Spring Cloud Stream的基本使用,小伙伴们对Spring Cloud Stream应该也有了一个基本的了解,但是上篇文章中的消息我们是从RabbitMQ的web管理页面发来的,如果我们想要从代码中发送消息呢?本文我们就来看看Spring Cloud Stream的一些使用细节。


本文是Spring Cloud系列的第三十篇文章,了解前二十九篇文章内容有助于更好的理解本文:

1.使用Spring Cloud搭建服务注册中心
2.使用Spring Cloud搭建高可用服务注册中心
3.Spring Cloud中服务的发现与消费
4.Eureka中的核心概念
5.什么是客户端负载均衡
6.Spring RestTemplate中几种常见的请求方式
7.RestTemplate的逆袭之路,从发送请求到负载均衡
8.Spring Cloud中负载均衡器概览
9.Spring Cloud中的负载均衡策略
10.Spring Cloud中的断路器Hystrix
11.Spring Cloud自定义Hystrix请求命令
12.Spring Cloud中Hystrix的服务降级与异常处理
13.Spring Cloud中Hystrix的请求缓存
14.Spring Cloud中Hystrix的请求合并
15.Spring Cloud中Hystrix仪表盘与Turbine集群监控
16.Spring Cloud中声明式服务调用Feign
17.Spring Cloud中Feign的继承特性
18.Spring Cloud中Feign配置详解
19.Spring Cloud中的API网关服务Zuul
20.Spring Cloud Zuul中路由配置细节
21.Spring Cloud Zuul中异常处理细节
22.分布式配置中心Spring Cloud Config初窥
23.Spring Cloud Config服务端配置细节(一)
24.Spring Cloud Config服务端配置细节(二)之加密解密
25.Spring Cloud Config客户端配置细节
26.Spring Cloud Bus之RabbitMQ初窥
27.Spring Cloud Bus整合RabbitMQ
28.Spring Cloud Bus整合Kafka
29.Spring Cloud Stream初窥  


自定义消息通道

上篇文章我们提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。

还是在上文的基础上,首先我们定义一个接口叫做MySink,如下:

public interface MySink {    String INPUT = "mychannel";    @Input(INPUT)    SubscribableChannel input(); }

这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:

public interface MySource {    @Output(MySink.INPUT)    MessageChannel output(); }

@Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。

最后我们定义一个消息接收类,如下:

@EnableBinding(value = {MySink.class}) public class SinkReceiver2 {    private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);    @StreamListener(MySink.INPUT)    public void receive(Object playload) {        logger.info("Received:" + playload);    } }

OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来一个单元测试测试一下,如下:

@RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration @SpringBootTest(classes = StreamHelloApplication.class) @EnableBinding(MySource.class) public class StreamHelloApplicationTests {    @Autowired    private MySource mySource;    @Test    public void contextLoads() {        mySource.output().send(MessageBuilder.withPayload("hello 123").build());    } }

运行单元测试,我们可以看到如下日志,表示消息发送成功了:

 

如果想要发送对象也可以直接发送,不用进行对象转换,如下:

发送:

Book book = new Book(1l, "三国演义", "罗贯中"); mySource.output().send(MessageBuilder.withPayload(book).build());

接收:

@StreamListener(MySink.INPUT) public void receive(Book playload) {    logger.info("Received:" + playload); }

如果我们想要在接收成功后给一个回执,也是OK的,如下:

@StreamListener(MySink.INPUT) @SendTo(Source.OUTPUT)//定义回执发送的消息通道 public String receive(Book playload) {    logger.info("Received:" + playload);    return "receive msg :" + playload; }

方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:

@StreamListener(Source.OUTPUT) public void receive2(String msg) {    System.out.println("msg:"+msg); }

当然要记得Source类也要在@EnableBinding注解中进行绑定。此时运行结果如下:

 

消费组

由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组和主题,如下:

spring.cloud.stream.bindings.mychannel.group=g1 spring.cloud.stream.bindings.mychannel.destination=dest1

这里我们设置该工程都属于g1消费组,输入通道的主题名则为dest1。这里配置完成之后,我们在消息发送方做如下配置:

spring.cloud.stream.bindings.mychannel.destination=dest1

也配置消息主题名为dest1(如果发送和接收就在同一个应用中,则这里可以不配置)。OK,此时我们将我们的项目启动两个实例,注意两个实例的端口不一样,此时如果我们再发送消息,则只会被两个实例中的一个接收到,另外一个应用则接收不到,但是到底是两个实例中的哪一个接收,则是不确定的。

消息分区

有的时候,我们可能需要相同特征的消息能够总是被发送到同一个消费者上去处理,如果我们只是单纯的使用消费组则无法实现功能,此时我们需要借助于消息分区,消息分区之后,具有相同特征的消息就可以总是被同一个消费者处理了,配置方式如下(这里的配置都是在消费组的配置基础上完成的):

在消费者上添加如下配置:

spring.cloud.stream.bindings.mychannel.consumer.partitioned=true spring.cloud.stream.instance-count=2 spring.cloud.stream.instance-index=0

关于这个配置我说三点:

1.第一行表示开启消息分区
2.第二行表示当前消息者的总的实例个数
3.第三行表示当前实例的索引,从0开始,当我们启动多个实例时,需要在启动时在命令行配置索引

然后在消息生产者上添加如下配置:

spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload spring.cloud.stream.bindings.mychannel.producer.partitionCount=2

第一行配置设置了分区键的表达式规则,第二行则设置了消息分区数量。

OK,此时我们再次启动多个消费者实例,然后重复发送多条消息,这些消息都将被同一个消费者处理掉。

Spring Cloud Stream使用细节我们就先说到这里,有问题欢迎留言讨论。

参考资料:

1.《Spring Cloud微服务实战》

更多JavaEE资料请关注公众号:


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存