引言

特别提醒: 文中提到的MQTT服务器Apache-Apollo,现在已经不维护。但是客户端的写法是通用的。目前我常用的是RabbitMQ加mqtt插件。

MQTT

MQTT(消息队列遥测传输)是ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。

特点

MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
  • 对负载内容屏蔽的消息传输;
  • 使用TCP/IP提供网络连接;
  • 有三种消息发布服务质量;

至多一次:消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

至少一次:确保消息到达,但消息重复可能会发生。

只有一次:确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
  • 使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

Apache-Apollo

Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多种协议。

原理:服务器端创建一个唯一订阅号,发送者可以向这个订阅号中发东西,然后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器其实是一个消息中转站。

下载

下载地址:http://archive.apache.org/dist/activemq/activemq-apollo/

配置与启动

  • 需要安装JDK环境
  • 在命令行模式下进入bin,执行apollo create mybroker d:\apache-apollo\broker,创建一个名为mybroker虚拟主机(Virtual Host)。需要特别注意的是,生成的目录就是以后真正启动程序的位置。
  • 在命令行模式下进入d:\apache-apollo\broker\bin,执行apollo-broker run,也可以用apollo-broker-service.exe配置服务。
  • 访问http://127.0.0.1:61680打开web管理界面。(密码查看broker/etc/users.properties)
  • 启动端口,看cmd输出。

SpringBoot2的开发

添加依赖

<!-- 
  spring-boot版本 2.1.0.RELEASE
-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定义配置

# src/main/resources/config/mqtt.properties
##################
#  MQTT 配置
##################
# 用户名
mqtt.username=admin
# 密码
mqtt.password=password
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
#  MQTT 生产者
##################
# 连接服务器默认客户端ID
mqtt.producer.clientId=mqttProducer
# 默认的推送主题,实际可在调用接口时指定
mqtt.producer.defaultTopic=topic1
##################
#  MQTT 消费者
##################
# 连接服务器默认客户端ID
mqtt.consumer.clientId=mqttConsumer
# 默认的接收主题,可以订阅多个Topic,逗号分隔
mqtt.consumer.defaultTopic=topic1

配置MQTT发布和订阅

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
 * MQTT配置,生产者
 *
 * @author BBF
 */
@Configuration
public class MqttConfig {
  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
  private static final byte[] WILL_DATA;
  static {
    WILL_DATA = "offline".getBytes();
  }
  /**
   * 订阅的bean名称
   */
  public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  /**
   * 发布的bean名称
   */
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  @Value("${mqtt.username}")
  private String username;
  @Value("${mqtt.password}")
  private String password;
  @Value("${mqtt.url}")
  private String url;
  @Value("${mqtt.producer.clientId}")
  private String producerClientId;
  @Value("${mqtt.producer.defaultTopic}")
  private String producerDefaultTopic;
  @Value("${mqtt.consumer.clientId}")
  private String consumerClientId;
  @Value("${mqtt.consumer.defaultTopic}")
  private String consumerDefaultTopic;
  /**
   * MQTT连接器选项
   *
   * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
   */
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
    // 这里设置为true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    // 设置连接的用户名
    options.setUserName(username);
    // 设置连接的密码
    options.setPassword(password.toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 设置超时时间 单位为秒
    options.setConnectionTimeout(10);
    // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
    options.setKeepAliveInterval(20);
    // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }
  /**
   * MQTT客户端
   *
   * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }
  /**
   * MQTT信息通道(生产者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息处理器(生产者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        producerClientId,
        mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(producerDefaultTopic);
    return messageHandler;
  }
  /**
   * MQTT消息订阅绑定(消费者)
   *
   * @return {@link org.springframework.integration.core.MessageProducer}
   */
  @Bean
  public MessageProducer inbound() {
    // 可以同时消费(订阅)多个Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(
            consumerClientId, mqttClientFactory(),
            StringUtils.split(consumerDefaultTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
  }
  /**
   * MQTT信息通道(消费者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_IN)
  public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息处理器(消费者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        LOGGER.error("===================={}============", message.getPayload());
      }
    };
  }
}

消息发布器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
 * MQTT生产者消息发送接口
 * <p>MessagingGateway要指定生产者的通道名称</p>
 * @author BBF
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
  /**
   * 发送信息到MQTT服务器
   *
   * @param data 发送的文本
   */
  void sendToMqtt(String data);
  /**
   * 发送信息到MQTT服务器
   *
   * @param topic 主题
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);
  /**
   * 发送信息到MQTT服务器
   *
   * @param topic 主题
   * @param qos 对消息处理的几种机制。
 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。

   * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。

   * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
   * @param payload 消息主体
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
}

发送消息

/**
 * MQTT消息发送
 *
 * @author BBF
 */
@Controller
@RequestMapping(value = "/")
public class MqttController {
  /**
   * 注入发送MQTT的Bean
   */
  @Resource
  private IMqttSender iMqttSender;
  /**
   * 发送MQTT消息
   * @param message 消息内容
   * @return 返回
   */
  @ResponseBody
  @GetMapping(value = "/mqtt", produces ="text/html")
  public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
    iMqttSender.sendToMqtt(message);
    return new ResponseEntity<>("OK", HttpStatus.OK);
  }
}

入口类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;
/**
 * SpringBoot 入口类
 *
 * @author BBF
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
    HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

代码

https://gitee.com/bbfbbf/mqtt-test

以上就是SpringBoot集成MQTT示例详解的详细内容,更多关于SpringBoot集成MQTT的资料请关注Devmax其它相关文章!

SpringBoot集成MQTT示例详解的更多相关文章

  1. ios – OSX上的AWS MQTT

    解决方法由于SSL握手问题,它是失败的.它正在检测到一个无效的证书.报告和解决了类似的问题here,引用相同的错误代码.由于p12文件中有多个身份,该问题被追溯到身份不匹配.在这种情况下,p12文件中有两个证书,但代码只读取第一个.我建议倾销.p12文件的内容,并确认证书.发布在这里审查.

  2. Paho MQTT Android服务问题

    我正在开发的应用程序中实现PahoMQTTAndroid服务.在测试了Paho提供的示例应用程序之后,我发现有一些事情我想改变.https://eclipse.org/paho/clients/android/应用程序完全关闭后,应用程序服务似乎将关闭.我希望在应用程序关闭的情况下保持服务运行,如果有更多消息进来.我也在寻找一种方法,一旦收到新消息就打开应用程序到特定活动.这是在消息到达时调用的回

  3. Paho MqttAndroidClient.connect总是失败

    我想将消息从android服务发布到本地服务器.这是我的代码的一部分,最简单的形式基于here的片段.但总是调用onFailure函数,我得到错误:显然由库返回,因为’listener!非常感谢.解决方法对我来说同样的问题,在我的情况下问题是标签位于之外标签.一开始我有这个:然后我改为:一切正常!

  4. SpringBoot本地磁盘映射问题

    这篇文章主要介绍了SpringBoot本地磁盘映射问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  5. java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

    这篇文章主要介绍了java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源),文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  6. SpringBoot整合Javamail实现邮件发送的详细过程

    日常开发过程中,我们经常需要使用到邮件发送任务,比方说验证码的发送、日常信息的通知等,下面这篇文章主要给大家介绍了关于SpringBoot整合Javamail实现邮件发送的详细过程,需要的朋友可以参考下

  7. SpringBoot详细讲解视图整合引擎thymeleaf

    这篇文章主要分享了Spring Boot整合使用Thymeleaf,Thymeleaf是新一代的Java模板引擎,类似于Velocity、FreeMarker等传统引擎,关于其更多相关内容,需要的小伙伴可以参考一下

  8. Springboot集成mybatis实现多数据源配置详解流程

    在日常开发中,若遇到多个数据源的需求,怎么办呢?通过springboot集成mybatis实现多数据源配置,简单尝试一下,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  9. SpringBoot使用Minio进行文件存储的实现

    本文主要介绍了SpringBoot使用Minio进行文件存储的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  10. 解析SpringBoot中使用LoadTimeWeaving技术实现AOP功能

    这篇文章主要介绍了SpringBoot中使用LoadTimeWeaving技术实现AOP功能,AOP面向切面编程,通过为目标类织入切面的方式,实现对目标类功能的增强,本文给大家介绍的非常详细,需要的朋友可以参考下

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部