一. 延时消息

延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

使用延时消息的典型场景,例如:

  • 在电商系统中,用户下完订单30分钟内没支付,则订单可能会被取消。
  • 在电商系统中,用户七天内没有评价商品,则默认好评。

这些场景对应的解决方案,包括:

  • 轮询遍历数据库记录
  • JDK 的 DelayQueue
  • ScheduledExecutorService
  • 基于 Quartz 的定时任务
  • 基于 Redis 的 zset 实现延时队列。

除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ 是2012年阿里巴巴开源的第三代分布式消息中间件。

三. RocketMQ 实现延时消息

3.1 业务背景

我们的系统完成某项操作之后,会推送事件消息到业务方的接口。当我们调用业务方的通知接口返回值为成功时,表示本次推送消息成功;当返回值为失败时,则会多次推送消息,直到返回成功为止(保证至少成功一次)。
当我们推送失败后,虽然会进行多次推送消息,但并不是立即进行。会有一定的延迟,并按照一定的规则进行推送消息。
例如:1小时后尝试推送、3小时后尝试推送、1天后尝试推送、3天后尝试推送等等。因此,考虑使用延时消息实现该功能。

3.2 生产者(Producer)

生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

首先,定义一个支持延时发送的 AbstractProducer。

abstract class AbstractProducer :ProducerBean() {
  var producerId: String? = null
  var topic: String? = null
  var tag: String?=null
  var timeoutMillis: Int? = null
  var delaySendTimeMills: Long? = null

  val log = LogFactory.getLog(this.javaClass)

  open fun sendMessage(messageBody: Any, tag: String) {
    val msgBody = JSON.toJSONString(messageBody)
    val message = Message(topic, tag, msgBody.toByteArray())

    if (delaySendTimeMills != null) {
      val startDeliverTime = System.currentTimeMillis()   delaySendTimeMills!!
      message.startDeliverTime = startDeliverTime
      log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
    }
    val logMessageId = buildLogMessageId(message)
    try {
      val sendResult = send(message)
      log.info(logMessageId   "producer messageId: "   sendResult.getMessageId()   "\n"   "messageBody: "   msgBody)
    } catch (e: Exception) {
      log.error(logMessageId   "messageBody: "   msgBody   "\n"   " error: "   e.message, e)
    }

  }

  fun buildLogMessageId(message: Message): String {
    return "topic: "   message.topic   "\n"  
        "producer: "   producerId   "\n"  
        "tag: "   message.tag   "\n"  
        "key: "   message.key   "\n"
  }
}

根据业务需要,增加一个支持重试机制的 Producer

@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {

  lateinit var delaySecondList:List<Long>

  fun sendMessage(messageBody: CleanReportPushEventMessage){
    //重试超过次数之后不再发事件
    if (delaySecondList!=null) {

      if(messageBody.times>=delaySecondList.size){
        return
      }
      val msgBody = JSON.toJSONString(messageBody)
      val message = Message(topic, tag, msgBody.toByteArray())
      val delayTimeMills = delaySecondList[messageBody.times]*1000L
      message.startDeliverTime = System.currentTimeMillis()   delayTimeMills
      log.info( "messageBody: "   msgBody  "startDeliverTime: " message.startDeliverTime )
      val logMessageId = buildLogMessageId(message)
      try {
        val sendResult = send(message)
        log.info(logMessageId   "producer messageId: "   sendResult.getMessageId()   "\n"   "messageBody: "   msgBody)
      } catch (e: Exception) {
        log.error(logMessageId   "messageBody: "   msgBody   "\n"   " error: "   e.message, e)
      }
    }
  }
}

在 CleanReportPushEventProducer 中,超过了重试的次数就不会再发送消息了。

每一次延时消息的时间也会不同,因此需要根据重试的次数来获取这个delayTimeMills 。

通过 System.currentTimeMillis() delayTimeMills 可以设置 message 的 startDeliverTime。然后调用 send(message) 即可发送延时消息。

我们使用商用版的 RocketMQ,因此支持精度为秒级别的延迟消息。在开源版本中,RocketMQ 只支持18个特定级别的延迟消息。:(

3.3 消费者(Consumer)

消费者负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

定义 Push 类型的 AbstractConsumer:

@Data
abstract class AbstractConsumer ():MessageListener{

  var consumerId: String? = null

  lateinit var subscribeOptions: List<SubscribeOptions>

  var threadNums: Int? = null

  val log = LogFactory.getLog(this.javaClass)

  override fun consume(message: Message, context: ConsumeContext): Action {
    val logMessageId = buildLogMessageId(message)
    val body = String(message.body)
    try {
      log.info(logMessageId   " body: "   body)
      val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
      log.info(logMessageId   " result: "   result.name)
      return result
    } catch (e: Exception) {
      if (message.reconsumeTimes >= 3) {
        log.error(logMessageId   " error: "   e.message, e)
      }
      return Action.ReconsumeLater
    }

  }

  abstract fun getMessageBodyType(tag: String): Type?

  abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action

  protected fun buildLogMessageId(message: Message): String {
    return "topic: "   message.topic   "\n"  
        "consumer: "   consumerId   "\n"  
        "tag: "   message.tag   "\n"  
        "key: "   message.key   "\n"  
        "MsgId:"   message.msgID   "\n"  
        "BornTimestamp"   message.bornTimestamp   "\n"  
        "StartDeliverTime:"   message.startDeliverTime   "\n"  
        "ReconsumeTimes:"   message.reconsumeTimes   "\n"
  }
}

再定义具体的消费者,并且在消费失败之后能够再发送一次消息。

@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {

  val logger: Logger = LoggerFactory.getLogger(this.javaClass)

  override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
    if(obj is CleanReportPushEventMessage){
      //清除事件
      logger.info("consumer clean-report event report_id:${obj.id} ")

      //消费失败之后再发送一次消息
      if(!cleanReportService.sendCleanReportEvent(obj.id)){
        val times = obj.times 1
        eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
      }
    }
    return Action.CommitMessage
  }

  override fun getMessageBodyType(tag: String): Type? {
    return CleanReportPushEventMessage::class.java
  }
}

其中,cleanReportService 的 sendCleanReportEvent() 会通过 http 的方式调用业务方提供的接口,进行事件消息的推送。如果推送失败了,则会进行下一次的推送。(这里使用了 eventProducer 的 sendMessage() 方法再次投递消息,是因为要根据调用的http接口返回的内容来判断消息是否发送成功。)

最后,定义 ConsumerFactory

@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {

  val logger: Logger = LoggerFactory.getLogger(this.javaClass)


  @PostConstruct
  fun start() {
    CompletableFuture.runAsync{
      consumers.stream().forEach {
        val properties = buildProperties(it.consumerId!!, it.threadNums)
        val consumer = ONSFactory.createConsumer(properties)
        if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
          for (options in it.subscribeOptions!!) {
            consumer.subscribe(options.topic, options.tag, it)
          }
          consumer.start()
          val message = "\n".plus(
              it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
                  .collect(Collectors.toList<Any>()))
          logger.info(String.format("consumer: %s\n", message))
        }
      }
    }
  }

  private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
    val properties = Properties()
    properties.put(PropertyKeyConst.ConsumerId, consumerId)
    properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
    properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
    if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
      properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
    } else {
      // 测试环境接入RocketMQ
      properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
    }
    properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
    return properties
  }
}

四. 总结

正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持Devmax。

使用Kotlin+RocketMQ实现延时消息的示例代码的更多相关文章

  1. Kotlin难点解析:extension和this指针

    扩展是Kotlin语言中使用非常简单的一个特性。关于这个问题,其实我之前的一篇文章[[Kotlin]LambdaandExtension](https://www.jianshu.com/p/d7a...中有提到过。为了解决这个问题,官方提出了两个新的概念:dispatchreceiver和extensionreceiver。extensionreceiver:中文翻译为扩展接收者。为了简化,这里我们将dispatchreceiver简称为DR,将extensionreceiver简称为ER。如果你习惯了

  2. android – Kotlin类NoClassDefFoundError崩溃

    我有一个使用以下库的现有Android项目:>Autovalue>Dagger2>RxJava>Retrolambda我正在尝试添加Kotlin支持,以便我可以将项目慢慢迁移到Kotlin.这就是我所做的.>添加了Kotlin依赖.>将其中一个类转换为Kt类并转移到src/main/kotlin/..包中.>在源集中添加了kotlin.sourceSets{main.java.srcDirs=’s

  3. android – Kotlin和Dagger2

    我正在尝试将Kotlin添加到我的项目中,但在启用Kotlin之后我无法构建,因为Dagger2类不再生成.我尝试了第二个项目,我有同样的问题.这些是我为启用Kotlin所做的改变:项目build.gradle:Appbuild.gradle:错误发生在这里:其中不再定义DaggerObjectGraph.任何帮助将不胜感激.解决方法只需删除

  4. android – 在Kotlin中不能使用argb color int值吗?

    当我想在Kotlin中为TextView的textColor设置动画时:发生此错误:似乎在Kotlin中不能将值0xFF8363FF和0xFFC953BE强制转换为Int,但是,它在Java中是正常的:有任何想法吗?提前致谢.解决方法0xFF8363FF是Long,而不是Int.你必须明确地将它们转换为Int:关键是0xFFC953BE的数值是4291384254,因此它应该存储在Long变量中.但这里的高位是符号位,表示负数:-3583042,可以存储在Int中.这就是两种语言之间的区别.在Kotlin

  5. 什么是我可以使用Kotlin的最早的Android API级别?

    我认为这个问题很清楚但是我能在Kotlin上定位的最早API级别是什么?解决方法实际上,任何API级别.这是因为Kotlin被编译为JVM6平台的字节码,所有AndroidAPI级别都支持该字节码.因此,除非您在Kotlin代码中使用任何较新的AndroidAPI,否则它不需要任何特定的API级别.

  6. android – Kotlin数据类和可空类型

    我是Kotlin的新手,我不知道为什么编译器会抱怨这段代码:编译器抱怨测试?.data.length,它说我应该这样做:test?.length.但是数据变量是String,而不是String?,所以我不明白为什么我要把它?当我想检查长度.解决方法表达式test?.data部分可以为空:它是test.data或null.因此,获取其长度并不是零安全的,而是应该再次使用safecalloperator:test?.length.可空性通过整个调用链传播:你必须将这些链写成?.)).e),因为,如果其中一个左

  7. android – Kotlin自定义获取执行方法调用

    像这样的东西:仍在使用Kotlin并且不确定get()方法是否会引用编辑器而不是创建新的编辑器.解决方法第二个属性声明适合您的需要:它有一个customgetter,因此获取属性值将始终执行getter,并且不存储该值.你可能会被等号get()=…

  8. android – Kotlin合成扩展和几个包含相同的布局

    我找了一些这样的:我在Studio中看到我可以访问dayName但是dayNameTextView引用了哪一个?正常,如果我只有一个包含的布局,它工作正常.但现在我有多次包含相同的布局.我当然可以这样做:但我正在寻找好的解决方案.版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请发送邮件至dio@foxmail.com举报,一经查实,本站将立刻删除。

  9. android – java.lang.IllegalArgumentException:指定为非null的参数为null:方法kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull

    我收到了这个错误java.lang.IllegalArgumentException:指定为非null的参数为null:方法kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull,参数事件为线覆盖funonEditorAction(v:TextView,actionId:Int,event:KeyEvent)以下是整个代码.这段代码最初是在ja

  10. android – Kotlin:如何访问CustomView的Attrs

    我在Kotlin中创建了一个自定义视图,并希望访问它的属性资源.以下是我的代码请注意,这将在init函数的attrs中出错.我想知道如何进入attrs?

随机推荐

  1. Flutter 网络请求框架封装详解

    这篇文章主要介绍了Flutter 网络请求框架封装详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  2. Android单选按钮RadioButton的使用详解

    今天小编就为大家分享一篇关于Android单选按钮RadioButton的使用详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  3. 解决android studio 打包发现generate signed apk 消失不见问题

    这篇文章主要介绍了解决android studio 打包发现generate signed apk 消失不见问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

  4. Android 实现自定义圆形listview功能的实例代码

    这篇文章主要介绍了Android 实现自定义圆形listview功能的实例代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  5. 详解Android studio 动态fragment的用法

    这篇文章主要介绍了Android studio 动态fragment的用法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  6. Android用RecyclerView实现图标拖拽排序以及增删管理

    这篇文章主要介绍了Android用RecyclerView实现图标拖拽排序以及增删管理的方法,帮助大家更好的理解和学习使用Android,感兴趣的朋友可以了解下

  7. Android notifyDataSetChanged() 动态更新ListView案例详解

    这篇文章主要介绍了Android notifyDataSetChanged() 动态更新ListView案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下

  8. Android自定义View实现弹幕效果

    这篇文章主要为大家详细介绍了Android自定义View实现弹幕效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  9. Android自定义View实现跟随手指移动

    这篇文章主要为大家详细介绍了Android自定义View实现跟随手指移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. Android实现多点触摸操作

    这篇文章主要介绍了Android实现多点触摸操作,实现图片的放大、缩小和旋转等处理,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部