2021年7月22日星期四

Spring Cloud Stream使用

Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持

简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者

生产者

pom文件中加入依赖

 <dependencies>  <dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>  </dependency>  <dependency>   <groupId>com.alibaba.cloud</groupId>   <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>   <version>2.1.0.RELEASE</version>  </dependency> </dependencies>

配置文件中增加关于Spring Cloud Stream binder和bindings的配置

spring: application: name: zhao-cloud-stream-producer cloud: stream:  rocketmq:  binder:   name-server: 127.0.0.1:9876  bindings:   output:   producer:    group: test    sync: true  bindings:  output:   destination: stream-test-topic   content-type: text/plain # 内容格式。这里使用 JSON

其中destination代表生产的数据发送到的topic
然后定义一个channel用于数据发送

import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface TestChannel { @Output("output") MessageChannel output();}

最后构造数据发送的接口

@Controllerpublic class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = "send", method = RequestMethod.GET) public String sendMessage() {  String messageId = UUID.randomUUID().toString();  Message<String> message = MessageBuilder    .withPayload("this is a test:" + messageId)    .setHeader(MessageConst.PROPERTY_TAGS, "test")    .build();  try {   testChannel.output().send(message);   return messageId + "发送成功";  } catch (Exception e) {   return messageId + "发送失败,原因:" + e.getMessage();  } }}

消费者

消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性

spring: application: name: zhao-cloud-stream-consumer cloud: stream:  rocketmq:  binder:   name-server: 127.0.0.1:9876  bindings:   input:   consumer:    tags: test  bindings:  input:   destination: stream-test-topic   content-type: text/plain # 内容格式。这里使用 JSON   group: test

另外关于channel的构造也要做同样的修改

import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface TestChannel { @Input("input") SubscribableChannel input();}

最后我在启动类中对收到的消息进行了监听

 @StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException {  System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); }

测试结果
file

file

Stream其他特性

消息发送失败的处理

消息发送失败后悔发送到默认的一个"topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开
消费者配置如下

spring: application: name: zhao-cloud-stream-producer cloud: stream:  rocketmq:  binder:   name-server: 127.0.0.1:9876  bindings:   output:   producer:    group: test    sync: true  bindings:  output:   destination: stream-test-topic   content-type: text/plain # 内容格式。这里使用 JSON   producer:   errorChannelEnabled: true

在启动类中配置错误消息的Channel信息

 @Bean("stream-test-topic.errors") MessageChannel testoutPutErrorChannel(){  return new PublishSubscribeChannel(); }

新建异常处理service

import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.stereotype.Service;@Servicepublic class ErrorProducerService { @ServiceActivator(inputChannel = "stream-test-topic.errors") public void receiveProducerError(Message message){  System.out.println("receive error msg :"+message); }}

当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketMq断开的场景。可见
file
:http://www.30bags.com/a/245834.html
唐蕃古道雅家梗 甘孜高原静谧之地:http://www.30bags.com/a/430153.html
唐家共乐园好玩吗?要不要门票?:http://www.30bags.com/a/396779.html
唐卡:绝无仅有的艺术(图) - :http://www.30bags.com/a/407065.html
口述:异地寻夫 却遭他和小三的毒打(4/4):http://lady.shaoqun.com/a/123608.html
少妇口述:乱性丈夫将我拖进换妻游戏:http://lady.shaoqun.com/m/a/73636.html
口述实录:少妇实录:邻居用销魂的床技将我征服:http://www.30bags.com/m/a/249951.html
深圳YOLO七夕国潮电音节有什么好玩的:http://www.30bags.com/a/517480.html
深圳航展2021报名入口:http://www.30bags.com/a/517481.html
520来了,大学cp"示爱"有多甜?:http://lady.shaoqun.com/a/428416.html
离婚时房子给了儿子,父亲还能享受居住权吗?:http://lady.shaoqun.com/a/428417.html

没有评论:

发表评论