查看原文
其他

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

翟永超 程序猿DD 2019-07-13

应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

  1. @StreamListener(value = TestTopic.INPUT)

  2. public void receiveV1(String payload, @Header("version") String version) {

  3.    if("1.0".equals(version)) {

  4.        // Version 1.0

  5.    }

  6.    if("2.0".equals(version)) {

  7.        // Version 2.0

  8.    }

  9. }

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试 下面通过编写一个简单的例子来具体体会一下这个属性的用法:

  1. @EnableBinding(TestApplication.TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {


  4.    public static void main(String[] args) {

  5.        SpringApplication.run(TestApplication.class, args);

  6.    }


  7.    @RestController

  8.    static class TestController {


  9.        @Autowired

  10.        private TestTopic testTopic;


  11.        /**

  12.         * 消息生产接口

  13.         *

  14.         * @param message

  15.         * @return

  16.         */

  17.        @GetMapping("/sendMessage")

  18.        public String messageWithMQ(@RequestParam String message) {

  19.            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());

  20.            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());

  21.            return "ok";

  22.        }


  23.    }


  24.    /**

  25.     * 消息消费逻辑

  26.     */

  27.    @Slf4j

  28.    @Component

  29.    static class TestListener {


  30.        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")

  31.        public void receiveV1(String payload, @Header("version") String version) {

  32.            log.info("Received v1 : " + payload + ", " + version);

  33.        }


  34.        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")

  35.        public void receiveV2(String payload, @Header("version") String version) {

  36.            log.info("Received v2 : " + payload + ", " + version);

  37.        }


  38.    }


  39.    interface TestTopic {


  40.        String OUTPUT = "example-topic-output";

  41.        String INPUT = "example-topic-input";


  42.        @Output(OUTPUT)

  43.        MessageChannel output();


  44.        @Input(INPUT)

  45.        SubscribableChannel input();


  46.    }


  47. }

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  2. spring.cloud.stream.bindings.example-topic-input.group=stream-content-route

  3. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

  1. 2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0

  2. 2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

·END·

 近期热文:


看完,赶紧点个“好看”鸭

点鸭点鸭

↓↓↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存