我正在使用RabbitMQ的循环功能来在多个消费者之间发送消息,但是一次只能收到一个消息.

我的问题是我的消息代表任务,我想在我的消费者身上有本地会话(状态).我事先知道哪些消息属于哪个会话,但是我不知道使用我指定的算法使RabbitMQ向消费者发送什么是最好的方法(或者有什么方法?).

我不想写我自己的编排服务,因为它将成为一个瓶颈,我不希望我的制作人知道哪个消费者会收到他们的消息,因为我会失去使用兔子的解耦.

有没有办法使RabbitMQ基于预定义的算法/规则而不是循环方式向消费者发送消息?

澄清:我用不同语言编写的几个微服务器,每个服务都有自己的工作.我们之间使用protobuf消息进行通信.我给每个新消息一个UUID.如果消费者收到消息,它可以从中创建一个响应消息(这可能不是正确的术语,因为生产者和消费者被解耦,并且他们彼此不了解),并且该UUID被复制到新消息.这形成一个数据转换流水线,该“进程”由UUID(processId)标识.我的问题是,有可能我有多个工作的消费者,如果以前看到它,我需要一个工作者坚持一个UUID.我有这个需要,因为

>每个进程可能有本地状态
>进程完成后,我想清理本地状态
>微服务器可能会收到同一进程的多条消息,我需要区分哪个消息属于哪个进程

由于RabbitMQ在使用循环的工作人员之间分配任务,因此我无法强制我的进程坚持工作.我有几个注意事项:

生产者与消费者脱钩,所以直接信息不是一个选择
>工作人员的数量不是常数(有一个负载平衡器可能会启动一个工作的新实例)

如果有一个解决方案不涉及更改循环算法,并且不会破坏我的约束,那也是可以的!

解决方法

如果你不想去编排业务,你可以尝试这样的拓扑:

为了简单起见,我假设您的processId用作路由密钥(在现实世界中,您可能希望将其存储在标题中,并使用标题交换).

传入消息将被接收的Exchange(类型:直接)接受,其中alternative-exchange属性设置为指向无会话交换(扇出).

这是RabbitMQ文档在“替代交易”中所说的:

It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings).

Typical examples of this are

  • detecting when clients accidentally or malicIoUsly publish messages that cannot be routed
  • “or else” routing semantics where some messages are handled specially and the rest by a generic handler

RabbitMQ’s Alternate Exchange (“AE”) feature addresses these use cases.

(我们对这里的用例特别感兴趣)

每个消费者将创建它自己的队列,并将其绑定到传入Exchange,使用到目前为止知道的会话的processId作为绑定的路由密钥.

这样一来,它只会收到有兴趣的会话的消息.

另外,所有的消费者都将绑定到共享的无会话队列.

如果有一个以前未知的processId的消息进来,那么它将不会对与Incoming Exchange进行注册的特定绑定,因此它将被重新路由到No Session Exchange =>没有会话队列,并以通常(循环)方式发送到其中一个消费者.

然后,消费者将使用传入交换(即启动新的“会话”)为其注册新的绑定,以便随后将使用此processId获取所有后续消息.

一旦“会话”结束,它将不得不删除相应的绑定(即关闭“会话”).

java – 可以使用自定义算法调度消息,而不是使用RabbitMQ进行循环?的更多相关文章

  1. HTML5实现直播间评论滚动效果的代码

    这篇文章主要介绍了HTML5实现直播间评论滚动效果的代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. 前端监听websocket消息并实时弹出(实例代码)

    这篇文章主要介绍了前端监听websocket消息并实时弹出,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. HTML5之消息通知的使用(Web Notification)

    通知可以说是web中比较常见且重要的功能,私信、在线提问、或者一些在线即时通讯工具我们总是希望第一时间知道对方有了新的反馈。本篇文章主要介绍了HTML5之消息通知的使用(Web Notification),感兴趣的小伙伴们可以参考一下

  4. HTML5中的Web Notification桌面通知功能的实现方法

    这篇文章主要介绍了HTML5中的Web Notification桌面通知功能的实现方法,需要的朋友可以参考下

  5. HTML5仿微信聊天界面、微信朋友圈实例代码

    小编最近开发一个基于html5开发的一个微信聊天前端界面,功能很全面,下面小编给大家分享实例代码,需要的朋友参考下

  6. HTML5的postMessage的使用手册

    HTML5提出了一个新的用来跨域传值的方法,即postMessage,这篇文章主要介绍了HTML5的postMessage的使用手册的相关资料,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  7. ios – Testflight无法安装应用程序

    我有几个测试人员注册了testflight并连接了他们的设备……他们有不同的ios型号……但是所有这些都有同样的问题.当他们从“safari”或“testflight”应用程序本身单击应用程序的安装按钮时……达到约90%并出现错误消息…

  8. xcode找不到匹配的配置文件

    我有一个AdhociOS应用程序,它给了我“在xcode6中找不到匹配的配置文件”,我创建了一个Adhoc配置文件,下载它,双击它并在General–Identity下选择了一个团队.但我接着得到了那条消息,并尝试使用“修复问题”按钮没有帮助.在构建设置–供应配置文件–发布我有“自动”.任何人都可以帮助我,我完全迷失了……

  9. ios – Reactive Cocoa – 以编程方式设置文本时不会调用UITextView的rac_textSignal

    我正在实现一个聊天UI,并使用ReactiveCocoa根据用户的类型调整聊天气泡的大小.目前,我正在基于textview的rac_textSignal更新UI的布局.一切都很好–除了一点:当用户发送消息时,我以编程方式清除文本字段:…我是否需要拥有一个持有currentTypedString的Nsstring,并在该字符串更新时驱动UI更改?

  10. ios – 当我关闭应用程序时,我从调试器获得消息:由于信号15而终止

    我怎么能解决这个问题,我不知道这个链接MypreviousproblemaboutCoredata对我的问题有影响吗?当我cmd应用程序的Q时,将出现此消息.Messagefromdebugger:Terminatedduetosignal15如果谁知道我以前的问题的解决方案,请告诉我.解决方法>来自调试器的消息:每当用户通过CMD-Q(退出)或STOP手动终止应用程序(无论是在iOS模拟器中还是

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部