SpringCloud Stream消息驱动

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

SpringCloud Stream 是一个构建消息驱动微服务的框架
应用程序通过 outputs 或 inputs 来与 SpringCloud Stream 中的 binder 对象交互
SpringCloud Stream 中的 binder 对象负责与消息中间件交互
通过 SpringCloud Stream 连接消息中间件,以实现消息事件驱动

什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka。

1.1、设计思想

1、标注的MQ流程

生产者/消费者之间靠消息媒介传递信息内容【massage】

消息必须走特定的通道【消息通道MessageChannel】

消息通道里的消息如何被消费呢,谁负责收发处理

消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

2、Cloud Stream的作用

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

3、什么是Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。 4、Stream中的消息通信方式遵循了发布-订阅模式

使用Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

1.2、标准的流程套路

1、Binder:很方便的连接中间件,屏蔽不同的差异

2、Channel

通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

3、Source和Sink

简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.3、编码API和常用注解

组成和注解 描述
Middleware 中间件,目前只支持RabbitM和Kafka
Binder Binder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过通道离开应用程序
@StreamListener 监听队列,用户消费者的队列的消息接收
@EnableBinding 指通道channel和exchange绑定在一起

2、消息驱动之生产者(output)

2.1、新建模块cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依赖-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称,消息生产者
          destination: studyExchange # 表示要使用的Exchange名称定义【自定义】
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置【上面的配置】

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生产者启动类

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生产者启动类
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、业务实现

2.5.1、服务接口实现类

自己创建一个实现的接口以及里面的方法

注意:在这个服务实现类里面不是使用@Service注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:发送接口实现类
 * 必须使用@EnableBinding(Source.class)注解开启消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 发送消息
     * @return
     */
    @Override
    public String send() {
        // 定义消息
        String serial = UUID.randomUUID().toString();
        // 构建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- "   serial   " ----------------");
        return serial;
    }
}

2.5.2、控制器实现

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息发送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息发送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每调用一次接口发送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、启动测试

  • 启动7001Eureka访问中心
  • 启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面

3.访问接口发送消息,查看MQ的管理页面波峰情况

3、消息驱动之消费者(input)

同样的参考如下流程图

3.1、新建cloud-stream-rabbitmq-consumer8802模块

3.2、引入pom.xml依赖

与8801一样

3.3、添加YAML配置文件

配置文件与消息生产的区别在于:

output: # 这个名字是一个通道的名称
	destination: studyExchange # 表示要使用的Exchange名称定义
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加启动类StreamMQMain8802

与消息生产者一样

3.5、业务实现

必须要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消费者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消费者的端口号
     */
    @Value("${server.port}")
    private String port;

    /**
     * 监听消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号接收到的消息 ----- "   message.getPayload()   " -----,port: "   port);
    }
}

3.6、启动项目测试

  • 启动7001
  • 启动8801,消息发送者
  • 启动8802,消息消费者
  • 8801发送消息,8802消费消息,并查看具体的MQ波峰图

控制器输出

4、分组消费与持久化

4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目

除了启动的端口号不一样之外其他的配置都一样

4.2、启动项目发现问题

  • 启动7001(Eureka服务中心)
  • 启动8801(生产)、8802(消费)、8803(消费)
  • 测试发送消失是否两个消费者都可以接收到

4.2.1、重复消费

目前是8802/8803同时都收到了,存在重复消费问题

解决方案:分组和持久化属性group

常见案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

4.2.2、分组

自定义配置分组,自定义分为同一个组,解决重复消费问题

配置文件分组

分别给8801、8802进行分组【orderA】

重启项目查看MQ管理

orderB是历史记录,上面的配置以及都分为了ordeerA组,进入orderA组可以查看实际的消费者数量

同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真

4.2.3、持久化

通过上述,解决了重复消费问题,再看看持久化

  • 停止8802/8803并去除掉8802的分组group: atguiguA,8803保留

  • 8801先发送7条消息到rabbitmq

3.先启动8802,无分组属性配置,后台没有打出来消息

8802因为取消了groupA的分组所以获取不到持久化的数据(如果重启mq也会消失)

4.再启动8803,有分组属性配置,后台打出来了MQ上的消息

8803保存groupA的分组所以在启动的时候就会将持久化的数据消费

到此这篇关于SpringCloud Stream消息驱动的文章就介绍到这了,更多相关SpringCloud Stream消息驱动内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

最新SpringCloud Stream消息驱动讲解的更多相关文章

  1. SpringCloud超详细讲解微服务网关Zuul基础

    这篇文章主要介绍了SpringCloud Zuul微服务网关,负载均衡,熔断和限流,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. SpringCloud gateway+zookeeper实现网关路由的详细搭建

    这篇文章主要介绍了SpringCloud gateway+zookeeper实现网关路由,本文通过图文实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. SpringCloud OpenFeign 服务调用传递 token的场景分析

    这篇文章主要介绍了SpringCloud OpenFeign 服务调用传递 token的场景分析,本篇文章简单介绍 OpenFeign 调用传递 header ,以及多线程环境下可能会出现的问题,其中涉及到 ThreadLocal 的相关知识,需要的朋友可以参考下

  4. 浅析Node.js 中 Stream API 的使用

    这篇文章给大家浅析node.js中stream api的使用,本文介绍的非常详细,涉及到node.js api,node.js stream相关知识,感兴趣的朋友可以参考下

  5. 使用IntelliJ IDEA调式Stream流的方法步骤

    本文主要介绍了使用IntelliJ IDEA调式Stream流的方法步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  6. Nodejs Buffer的使用及Stream流和事件机制详解

    这篇文章主要为大家介绍了Nodejs Buffer的使用及Stream流和事件机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  7. Springcloud Stream消息驱动工具使用介绍

    SpringCloud Stream由一个中间件中立的核组成,应用通过SpringCloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是发送消息到队列中的)通道与外界交流

  8. SpringCloud中Gateway的使用教程详解

    SpringCloud Gateway是Spring体系内的一个全新项目,它旨在为微服务架构提供一种简单有效的统一的API路由管理方式。本文就来为大家详细讲讲Gateway的使用教程,需要的可以参考一下

  9. 深入nodejs中流(stream)的理解

    本篇文章主要介绍了深入nodejs中流(stream)的理解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  10. 浅谈Node.js:理解stream

    本篇文章主要介绍了Node.js:stream,Stream在node.js中是一个抽象的接口,具有一定的参考价值,有需要的可以了解一下。

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部