前言

现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的。 引入消息队列可以给这个项目带来很多的好处:比如

  • 削峰

这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请求

  • 解耦合

比如现在有系统A,当系统A执行完成后,B、C系统需要拿到A系统的结果才可以继续执行,如果不引入MQ,A系统还要调用B、C系统,这样这A、B、C三个系统的耦合性就很大。引入MQ后A系统的执行结果只需要保证将消息投递到MQ就好,其它的两个系统只需要监听这个MQ的某个队列,这样就降低了这三个系统之间的耦合性。

  • 异步

再通过A、B、C这三个系统举例。A系统在返回给用户的执行结果前需要完成B、C系统的调用,这个总的执行时间是A B C的执行时间,如果引入MQ,A系统的执行完成后将数据投递到MQ,直接响应用户。B、C再这在通过监听完成数据的处理。这样也降低了用户的等待时间

除了这些好处,当然引入MQ还会有不好的地方:比如

  • 数据一致性问题
    • A系统执行完将数据投递到了MQ,B、C在消费的时候如果出现了问题,是不是就导致了数据不一致的问题
  • 可用性降低
    • 一个好好的系统,引入一个MQ,如果这个MQ拓机了呢?这个可能就需要集群来提高MQ的高可用。
  • 系统的复杂度提高
    • 引入了MQ,我们还需要关注消息是否被成功的投递,MQ中的消息被积压太多怎么办?消费端是否成功的消费的消息。

这些都是问题,所在是否要引入MQ还需要看业务需求

RabbitMq的投递及消费流程

这里有张投递消息到消费的流程图

从这张图上可看出这也是一种AMQP协议的实现。消息的提供者先是通过某一个信道将消息发送到交换机,然后交换机通通RoutingKey来将消息分发到某一个队列上。然后,消费者在临听某一个队列来进行消息的消费。

今天我们的主题是如何保证消息的投递可靠性。那么我们来想想在这个流程中那些位置可能会影响我们消息的投递可靠性?

从上图中我们可以总结出有二个因素影响着消息是否被成功投递和被成功消费

提供者

  • 提供者有没有将消成功的发送到MQ并被处理
  • 发送到MQ中的消息有没有成功的被路由到队列中

消费者

  • 消费者有没有成功的签收消息并成功处理。
  • 消费者是否可以保证消费者的稳定性

提供者如何确保消息的成功投递

解决这个问题,我们可以通过提供者的发送方确认机制来实现,这个发送方确认机制又分成三种:

  • 单条消息的同步确认
  • 多条消息的同步确认
  • 异步消息确认

单条消息的同步确认

首先要在当前的Channel上开启消息确认模式,然后通过waitForConfirms()方法进行消息确认是否发送成功。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi",
                    null,
                    messageStr.getBytes());
            boolean isSendSuccess = channel.waitForConfirms();
            if(isSendSuccess){
                System.out.print("消息发送成功");
            }
        }
    }

这样做的话每次发完消息后,都会确保消息是否发送成功。如果发送失败的话进行相应的处理。

多条消息的同步确认

多条消息的确认和单条的差不多,比如我将发送消息的代码放到一个循环内。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            Map<String,String> mes = new HashMap<>();
            mes.put("name","1111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i  ){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr());
            }
            boolean isSendSuccess = channel.waitForConfirms();
            System.out.println(isSendSuccess);
        }
    }

这样的话当一批消息发送完成后,进行统一的消息确认是否发送成功,就成了多条的消息确认,不过并不推荐使用这种确认消息的方式

在多条的消息确认中,比如我先是发送了一批的消息,比如这批消息有100条,这个时候如果有其中的一条消息没有发送成功,这里返回的也是false,然尔我们并不能知道是具体的哪 一条消息发送失败。

异步消息确认

异步的消息确认是通过一个监听器来实现的,当消息发送后,会接着执行下面的逻辑,可能在稍会的一段时间,监听器监听到了Broker的返回,再进行逻辑的处理。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.confirmSelect();
            ConfirmListener confirmListener = new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送成功:"   deliveryTag   " multiple:"   multiple);
                }
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("发送失败:"   deliveryTag);
                }
            };
            channel.addConfirmListener(confirmListener);
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            for(int i = 0;i < 100;i  ){
                channel.basicPublish(
                        "exchange.drinks",
                        "drinks.juzi",
                        null,
                        messageStr.getBytes());
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

当成功的发送消息的时候会回调监听器中的handleAck方法,如果没有发送成功会回调handleNack方法 在这个监听器里面有两个参数一个deliveryTagmultiple:

  • deliveryTag:表示当前的Channel发送的第几条消息
  • multiple:是否在确认多条消息

这个异步的虽然在听觉上感觉比较厉害些,这里也不推荐使用,原因和上面的一样,我们并不能具休的知道是哪一条消息没有被确认发送。

综上:这里更加推荐单条消息确认,具体选择哪一种还是要用业务做出选择

注:注意一点是当一条消息成功的发送到Broker,但是如果没有正确的路由到队列,那么这时borker也是会返回true,因为Broker确时接收到了消息只是RoutingKey不可达,所以这里也会返回true,并且直接将消息丢弃

消息的返回机制

这个消息返回机制的作用就是在当一个消息成功的发送,但是并没有正确路由到队列的时候所回调的。

这也弥补了上面确认消息是否发送成功但没有路由到队列所返回true的问题 在使用消息返回机制的时候在发送消息时需要将mandatory置成true。再添加对应的监听器。

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost("host");
        cf.setPort(5672);
        cf.setUsername("账号");
        cf.setPassword("密码");
        try(Connection connection = cf.newConnection();
            Channel channel = connection.createChannel()){
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    System.out.println("replyCode:"   returnMessage.getReplyCode()   " replyText:"   returnMessage.getReplyText()   " routingKey:"
                      returnMessage.getRoutingKey()   " exchange:"   returnMessage.getExchange()   " body:"   new String(returnMessage.getBody()));
                }
            });
            Map<String,String> mes = new HashMap<>();
            mes.put("name","11111");
            String messageStr = objectMapper.writeValueAsString(mes);
            channel.basicPublish(
                    "exchange.drinks",
                    "drinks.juzi1",
                    true,
                    null,
                    messageStr.getBytes());
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

这里的addReturnListener方法有两个重载:只不过是handle的参数不同,一个是参数都显示在了参数列表内,一个是将参数封装到了Return对象内。当handle被回调的时候也可以获取到相应的参数比如:exchange routingkey body。

注:保证消息可靠性投递的前提是服务的高可用,服务不高可用谈其它的都是扯

以上就是详解RabbitMq如何做到消息的可靠性投递的详细内容,更多关于RabbitMq 消息可靠性投递的资料请关注Devmax其它相关文章!

详解RabbitMq如何做到消息的可靠性投递的更多相关文章

  1. python操作RabbitMq的三种工作模式

    这篇文章主要为大家介绍了python操作RabbitMq的三种工作模式,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪

  2. springboot-rabbitmq-reply 消息直接回复模式详情

    这篇文章主要介绍了springboot-rabbitmq-reply消息直接回复模式详情,文章通过围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  3. SpringBoot+RabbitMQ 实现死信队列的示例

    本文主要介绍了SpringBoot+RabbitMQ 实现死信队列的示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  4. PHP实现RabbitMQ消息列队的示例代码

    众所周知,php本身的运行效率存在一定的缺陷,所以如果有一个很复杂很耗时的业务时,必须开发一个常驻内存的程序。本文将利用PHP实现RabbitMQ消息列队,感兴趣的可以了解一下

  5. Java RabbitMQ消息队列详解常见问题

    消息队列是最古老的中间件之一,从系统之间有通信需求开始,就自然产生了消息队列。本文告诉什么是消息队列,为什么需要消息队列,常见的消息队列有哪些,RabbitMQ的部署和使用

  6. SpringBoot整合Canal与RabbitMQ监听数据变更记录

    这篇文章主要介绍了SpringBoot整合Canal与RabbitMQ监听数据变更记录,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下

  7. RabbitMQ消息确认机制剖析

    这篇文章主要为大家介绍了RabbitMQ消息确认机制剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  8. python对RabbitMQ的简单入门使用教程

    RabbitMq是实现了高级消息队列协议(AMQP)的开源消息代理中间件,下面这篇文章主要给大家介绍了关于python对RabbitMQ的简单入门使用,文中通过实例代码介绍的非常详细,需要的朋友可以参考下

  9. Windows下RabbitMQ安装及配置详解

    本文主要介绍了Windows下RabbitMQ安装及配置详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  10. springboot +rabbitmq+redis实现秒杀示例

    本文主要介绍了springboot +rabbitmq+redis实现秒杀示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部