博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
阅读量:6113 次
发布时间:2019-06-21

本文共 2970 字,大约阅读时间需要 9 分钟。

hot3.png

应用场景

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

@StreamListener(value = TestTopic.INPUT)public void receiveV1(String payload, @Header("version") String version) {    if("1.0".equals(version)) {        // Version 1.0    }    if("2.0".equals(version)) {        // Version 2.0    }}

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication {    public static void main(String[] args) {        SpringApplication.run(TestApplication.class, args);    }    @RestController    static class TestController {        @Autowired        private TestTopic testTopic;        /**         * 消息生产接口         *         * @param message         * @return         */        @GetMapping("/sendMessage")        public String messageWithMQ(@RequestParam String message) {            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());            return "ok";        }    }    /**     * 消息消费逻辑     */    @Slf4j    @Component    static class TestListener {        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")        public void receiveV1(String payload, @Header("version") String version) {            log.info("Received v1 : " + payload + ", " + version);        }        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")        public void receiveV2(String payload, @Header("version") String version) {            log.info("Received v2 : " + payload + ", " + version);        }    }    interface TestTopic {        String OUTPUT = "example-topic-output";        String INPUT = "example-topic-input";        @Output(OUTPUT)        MessageChannel output();        @Input(INPUT)        SubscribableChannel input();    }}

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topicspring.cloud.stream.bindings.example-topic-input.group=stream-content-routespring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.02018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

本文首发:

转载于:https://my.oschina.net/didispace/blog/2994382

你可能感兴趣的文章
[物理学与PDEs]第3章习题1 只有一个非零分量的磁场
查看>>
深入浅出NodeJS——数据通信,NET模块运行机制
查看>>
onInterceptTouchEvent和onTouchEvent调用时序
查看>>
android防止内存溢出浅析
查看>>
4.3.3版本之引擎bug
查看>>
SQL Server表分区详解
查看>>
使用FMDB最新v2.3版本教程
查看>>
SSIS从理论到实战,再到应用(3)----SSIS包的变量,约束,常用容器
查看>>
STM32启动过程--启动文件--分析
查看>>
垂死挣扎还是涅槃重生 -- Delphi XE5 公布会归来感想
查看>>
淘宝的几个架构图
查看>>
Android扩展 - 拍照篇(Camera)
查看>>
JAVA数组的定义及用法
查看>>
充分利用HTML标签元素 – 简单的xtyle前端框架
查看>>
设计模式(十一):FACADE外观模式 -- 结构型模式
查看>>
iOS xcodebuile 自动编译打包ipa
查看>>
程序员眼中的 SQL Server-执行计划教会我如何创建索引?
查看>>
【BZOJ】1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路(floyd)
查看>>
cmake总结
查看>>
数据加密插件
查看>>