查看原文
其他

Spring Cloud Stream如何消费自己生产的消息?

翟永超 程序猿DD 2019-07-13

在上一篇《Spring Cloud Stream如何处理消息重复消费?》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢?


常见错误 


在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^)。以下错误基于Spring Boot 2.0.5、Spring Cloud Finchley SR1。


首先,根据入门示例,为了生产和消费消息,需要定义两个通道:一个输入、一个输出。比如下面这样:

  1. public interface TestTopic {

  2.    String OUTPUT = "example-topic";

  3.    String INPUT = "example-topic";

  4.    @Output(OUTPUT)

  5.    MessageChannel output();

  6.    @Input(INPUT)

  7.    SubscribableChannel input();

  8. }


通过INPUT和OUTPUT使用相同的名称,让生产消息和消费消息指向相同的Topic,从而实现消费自己发出的消息。


接下来,创建一个HTTP接口,并通过上面定义的输出通道触来生产消息,比如:

  1. @Slf4j

  2. @RestController

  3. public class TestController {

  4.    @Autowired

  5.    private TestTopic testTopic;

  6.    @GetMapping("/sendMessage")

  7.    public String messageWithMQ(@RequestParam String message) {

  8.        testTopic.output().send(MessageBuilder.withPayload(message).build());

  9.        return "ok";

  10.    }

  11. }


已经有生产消息的实现,下面来创建对输入通道的监听,以实现消息的消费逻辑。

  1. @Slf4j

  2. @Component

  3. public class TestListener {

  4.    @StreamListener(TestTopic.INPUT)

  5.    public void receive(String payload) {

  6.        log.info("Received: " + payload);

  7.        throw new RuntimeException("BOOM!");

  8.    }

  9. }


最后,在应用主类中,使用@EnableBinding注解来开启它,比如:

  1. @EnableBinding(TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {

  4.    public static void main(String[] args) {

  5.        SpringApplication.run(TestApplication.class, args);

  6.    }

  7. }


看似天衣无缝的操作,然而在启动的瞬间,你可能收到了下面这样的错误:

  1. org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.didispace.stream.TestTopic; factoryMethodName=input; initMethodName=null; destroyMethodName=null

  2.    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]

  3.    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:54) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]

  4.    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:86) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]

  5.    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  6.    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541) ~[spring-core-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  7.    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]

  8.    at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]

  9.    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  10.    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_151]

  11.    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  12.    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  13.    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  14.    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  15.    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  16.    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:271) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  17.    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:91) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  18.    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  19.    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532) ~[spring-context-5.0.9.RELEASE.jar:5.0.9.RELEASE]

  20.    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  21.    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:780) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  22.    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  23.    at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  24.    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1277) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  25.    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1265) [spring-boot-2.0.5.RELEASE.jar:2.0.5.RELEASE]

  26.    at com.didispace.stream.TestApplication.main(TestApplication.java:13) [classes/:na]


正确姿势 


根据错误提示:Invalid bean definition with name 'example-topic' defined in com.didispace.stream.TestTopic: bean definition with this name already exists,没有启动成功的原因是已经存在了一个名为example-topic的Bean,那么为什么会重复创建这个Bean呢?


实际上,在F版的Spring Cloud Stream中,当我们使用@Output和@Input注解来定义消息通道时,都会根据传入的通道名称来创建一个Bean。而在上面的例子中,我们定义的@Output和@Input名称是相同的,因为我们系统输入和输出是同一个Topic,这样才能实现对自己生产消息的消费。


既然这样,我们定义相同的通道名是行不通了,那么我们只能通过定义不同的通道名,并为这两个通道配置相同的目标Topic来将这一对输入输出指向同一个实际的Topic。对于上面的错误程序,只需要做如下两处改动:


第一步:修改通道名,使用不同的名字

  1. public interface TestTopic {

  2.    String OUTPUT = "example-topic-output";

  3.    String INPUT = "example-topic-input";

  4.    @Output(OUTPUT)

  5.    MessageChannel output();

  6.    @Input(INPUT)

  7.    SubscribableChannel input();

  8. }


第二步:在配置文件中,为这两个通道设置相同的Topic名称,比如

  1. spring.cloud.stream.bindings.example-topic-input.destination=aaa-topic

  2. spring.cloud.stream.bindings.example-topic-output.destination=aaa-topic


这样,这两个输入输出通道就会都指向名为aaa-topic的Topic了。


最后,再启动该程序,没有报错。

然后访问接口:localhost:8080/sendMessage?message=hello-didi,可以在控制台中看到如下信息:

  1. 2018-11-17 23:24:10.425  INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]

  2. 2018-11-17 23:24:10.453  INFO 32039 --- [ctor-http-nio-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#266753da:0/SimpleConnection@627fba83 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60752]

  3. 2018-11-17 23:24:10.458  INFO 32039 --- [ctor-http-nio-2] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (aaa-topic.anonymous.fNUxZ8C0QIafxrhkFBFI1A) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.

  4. 2018-11-17 23:24:10.483  INFO 32039 --- [IafxrhkFBFI1A-1] com.didispace.stream.TestListener        : Received: hello-didi


消费自己生产的消息成功了!读者也还可以访问一下应用的/actuator/beans端点,看看当前Spring上下文中有哪些Bean,应该可以看到有下面Bean,也就是上面分析的两个通道的Bean对象。


  1. "example-topic-output": {

  2.    "aliases": [],

  3.    "scope": "singleton",

  4.    "type": "org.springframework.integration.channel.DirectChannel",

  5.    "resource": null,

  6.    "dependencies": []

  7. },

  8. "example-topic-input": {

  9.    "aliases": [],

  10.    "scope": "singleton",

  11.    "type": "org.springframework.integration.channel.DirectChannel",

  12.    "resource": null,

  13.    "dependencies": []

  14. },


-END-

 近期热文:

关注我

点击“阅读原文”,看本号其他精彩内容

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存