MQTT

MQTT是一个极其轻量级的发布/订阅消息传输协议,对于需要较小代码占用空间或网络带宽非常宝贵的远程连接非常有用

有如下特点:

  • 开放消息协议,简单易实现;
  • 发布订阅模式,一对多消息发布;
  • 基于TCP/IP网络连接,提供有序,无损,双向连接;
  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量;
  • 消息QoS支持,可靠传输保证。

添加依赖

        maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

首先,连接服务器,SERVER_HOST为主机名,CLIENT_ID为客户端唯一标识,还需要用户名和密码,当然,这些一般由服务器返回。

    fun connect() {
        try {
            //MemoryPersistence设置clientId的保存形式,默认为以内存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT连接设置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次连接到服务器都以新的身份,false表示服务器会保留客户的连接记录
                isCleanSession = true
                //用户名和密码
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超时时间,单位是秒
                connectionTimeout = 30
                //会话心跳时间,单位是秒,服务器每隔30秒向客户端发送消息判断客户端是否在线
                keepAliveInterval = 30
            }
            //设置回调
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

心跳包:在空闲时间内,如果客户端没有其他任何报文发送,必须发送一个PINGREQ报文到服务器,而如果服务端在 1.5 倍心跳时间内没有收到客户端消息,则会主动断开客户端的连接,发送其遗嘱消息给所有订阅者,而服务端收到PINGREQ报文之后,立即返回PINGRESP报文给客户端。

如果两个客户端的 clientID 一样,那么服务端记录第一个客户端连接之后再收到第二个客户端连接请求,则会向第一个客户端发送 Disconnect 报文断开连接,并连接第二个客户端。

遗嘱消息:MqttConnectOptions的setWill方法可以设置遗嘱消息,客户端没有主动向服务端发起disconnect断开连接消息,但服务端检测到和客户端的连接已断开,此时服务端将该客户端设置的遗嘱消息发送出去,遗嘱Topic中不能存在通配符。

应用场景:客户端掉线之后,可以及时通知到所有订阅该遗嘱topic的客户端。

订阅消息。

    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }

MQTT是一种发布/订阅的消息协议, 通过设定的主题topic,发布者向topic发送的payload负载消息会经过服务器,转发到所有订阅该topic的订阅者。topic有两个通配符,“ ” 和 “#”,与 “/” 组合使用,“ ” 只能表示一级topic,“#” 可以表示任意层级,例如,订阅topic为 “guangdong/ ”,发布者发布的topic可以是 guangdong,guangdong/shenzhen,但是不能是guangdong/shenzhen/nanshan,而订阅的topic如果是 “guangdong/#” 则可以收到。

Qos为服务质量等级,qos=0,表示发送方只发一次,不管是否发成功,也不管接收方是否成功接收,适用于不重要的数据传输;qos=1,表示消息至少有一次到达接收方,发送方发送消息,需要等待接收方返回应答消息,如果发送方在一定时间内未收到应答,发送方将继续发送,直到收到应答消息,删除本地消息缓存,不再发送。适用于需要收到所有消息,客户端可以处理重复消息;qos = 2:确保消息只一次到达接收方,一般我们都会选择2。

发布消息

    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

payload为负载消息,字节流类型,是 MQTT 通信传输的真实数据。retain是保留信息,服务端将保留对应topic最新的一条消息记录,保留消息的作用是每次客户端连上都会收到其topic的最后一条保留消息。

整个封装类如下:

class MQTTManager private constructor(private val context: Context) {
    private var client: MqttAsyncClient? = null
    private lateinit var options: MqttConnectOptions
    private val iMqttActionListener = object : IMqttActionListener {
        override fun onSuccess(asyncActionToken: IMqttToken?) {
            //连接成功,开始订阅
            subscribeMessage(TOPIC, 2)
        }
        override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            //连接失败
        }
    }
    companion object {
        @Volatile
        private var instance: MQTTManager? = null
        fun getInstance(context: Context): MQTTManager = instance ?: synchronized(this) {
            instance ?: MQTTManager(context).also {
                instance = it
            }
        }
    }
    /**
     * 连接服务器
     */
    fun connect() {
        try {
            //MemoryPersistence设置clientId的保存形式,默认为以内存保存
            client = MqttAsyncClient(SERVER_HOST, CLIENT_ID, MemoryPersistence())
            //MQTT连接设置
            options = MqttConnectOptions()
            with(options) {
                //是否清空session,true表示每次连接到服务器都以新的身份,false表示服务器会保留客户的连接记录
                isCleanSession = true
                //用户名和密码
                userName = USERNAME
                password = PASSWORD.toCharArray()
                //超时时间,单位是秒
                connectionTimeout = 30
                //会话心跳时间,单位是秒,服务器每隔30秒向客户端发送消息判断客户端是否在线
                keepAliveInterval = 30
                //自动重连
                isAutomaticReconnect = true
            }
            //设置回调
            client!!.setCallback(PushCallBack())
            client!!.connect(options, context, iMqttActionListener)
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 订阅消息
     */
    fun subscribeMessage(topic: String, qos: Int) {
        client?.let {
            try {
                it.subscribe(topic, qos)
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 发布消息
     */
    fun publish(topic: String, msg: String, isRetained: Boolean, qos: Int) {
        try {
            client?.let {
                val message = MqttMessage()
                message.qos = qos
                message.isRetained = isRetained
                message.payload = msg.toByteArray()
                it.publish(topic, message)
            }
        } catch (e: MqttPersistenceException) {
            e.printStackTrace()
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }
    /**
     * 断开连接
     */
    fun disconnect() {
        client?.takeIf {
            it.isConnected
        }?.let {
            try {
                it.disconnect()
                instance = null
            } catch (e: MqttException) {
                e.printStackTrace()
            }
        }
    }
    /**
     * 判断是否连接
     */
    fun isConnected() = if (client != null) client!!.isConnected else false
    inner class PushCallBack : MqttCallback {
        override fun connectionLost(cause: Throwable?) {
            //断开连接
        }
        override fun messageArrived(topic: String?, message: MqttMessage?) {
            //接收消息回调
        }
        override fun deliveryComplete(token: IMqttDeliveryToken?) {
            //发布消息完成后的回调
        }
    }
}

WebSocket

WebSocket是应用层的一种协议,是建立在TCP协议基础上的,主要特点就是全双工通信,允许服务器主动发送信息给客户端。

这里使用OKHTTP进行WebSocket开发。

class WebSocketManager private constructor() {
    private val client: OkHttpClient = OkHttpClient.Builder().writeTimeout(5, TimeUnit.SECONDS)
        .readTimeout(5, TimeUnit.SECONDS)
        .connectTimeout(5, TimeUnit.SECONDS)
        .build()
    private var webSocket: WebSocket? = null
    companion object {
        val instance: WebSocketManager by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { WebSocketManager() }
    }
    fun connect(url: String) {
        webSocket?.cancel()
        val request = Request.Builder().url(url).build()
        webSocket = client.newWebSocket(request, object : WebSocketListener() {
            //连接成功后回调
            override fun onOpen(webSocket: WebSocket, response: Response) {
                super.onOpen(webSocket, response)
            }
            //服务器发送消息给客户端时回调
            override fun onMessage(webSocket: WebSocket, text: String) {
                super.onMessage(webSocket, text)
            }
            //服务器发送的byte类型消息
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                super.onMessage(webSocket, bytes)
            }
            //服务器连接关闭中
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosing(webSocket, code, reason)
            }
            //服务器连接已关闭
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                super.onClosed(webSocket, code, reason)
            }
            //服务器连接失败
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                super.onFailure(webSocket, t, response)
            }
        })
    }
    //发送消息
    private fun sendMessage(message: String) {
        webSocket?.send(message)
    }
    private fun close(code: Int, reason: String) {
        webSocket?.close(code, reason)
    }
}

总结

WebSocket是一种简单的报文协议,着重解决客户端与服务端不能双向通信的问题。

MQTT是基于TCP的发布/订阅消息传输协议,客户端可以创建和订阅任意主题,并向主题发布或接收消息,此外,有许多为物联网优化的特性,比如服务质量等级Qos,层级主题,遗嘱信息等。

MQTT和WebSocket都是应用层协议,都使用TCP协议来确保可靠传输,都支持双向通信,都使用二进制编码。WebSocket更简单灵活,MQTT相对复杂,但功能强大,大家可以根据自己的使用场景来选择 。

到此这篇关于Android MQTT与WebSocket协议详细讲解的文章就介绍到这了,更多相关Android MQTT与WebSocket内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Android MQTT与WebSocket协议详细讲解的更多相关文章

  1. 五分钟学会HTML5的WebSocket协议

    这篇文章主要介绍了五分钟学会HTML5的WebSocket协议,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  2. 前端监听websocket消息并实时弹出(实例代码)

    这篇文章主要介绍了前端监听websocket消息并实时弹出,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. html5 canvas合成海报所遇问题及解决方案总结

    这篇文章主要介绍了html5 canvas合成海报所遇问题及解决方案总结,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  4. HTML5 WebSocket实现点对点聊天的示例代码

    这篇文章主要介绍了HTML5 WebSocket实现点对点聊天的示例代码的相关资料,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  5. Html5 video标签视频的最佳实践

    这篇文章主要介绍了Html5 video标签视频的最佳实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  6. HTML5在微信内置浏览器下右上角菜单的调整字体导致页面显示错乱的问题

    HTML5在微信内置浏览器下,在右上角菜单的调整字体导致页面显示错乱的问题,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

  7. ios – containerURLForSecurityApplicationGroupIdentifier:在iPhone和Watch模拟器上给出不同的结果

    我使用默认的XCode模板创建了一个WatchKit应用程序.我向iOSTarget,WatchkitAppTarget和WatchkitAppExtensionTarget添加了应用程序组权利.(这是应用程序组名称:group.com.lombax.fiveminutes)然后,我尝试使用iOSApp和WatchKitExtension访问共享文件夹URL:延期:iOS应用:但是,测试NSURL

  8. Ionic – Splash Screen适用于iOS,但不适用于Android

    我有一个离子应用程序,其中使用CLI命令离子资源生成的启动画面和图标iOS版本与正在渲染的启动画面完美配合,但在Android版本中,只有在加载应用程序时才会显示白屏.我检查了config.xml文件,所有路径看起来都是正确的,生成的图像出现在相应的文件夹中.(我使用了splash.psd模板来生成它们.我错过了什么?这是config.xml文件供参考,我觉得我在这里做错了–解决方法在config.xml中添加以下键:它对我有用!

  9. ios – Websockets可以在移动电话上工作吗?

    相关地,我怀疑长轮询客户端可能是实现类似功能的好方法,但我想知道我可能遇到的移动特定问题.到目前为止,我已经读过长时间的轮询请求可能会对电池寿命产生相当大的影响.我还听说iOS以某种方式限制了对单个服务器的连接数量,这可能是个问题.有没有人在使用实时组件的移动应用程序上工作?

  10. ios – 无法启动iPhone模拟器

    /Library/Developer/CoreSimulator/Devices/530A44CB-5978-4926-9E91-E9DBD5BFB105/data/Containers/Bundle/Application/07612A5C-659D-4C04-ACD3-D211D2830E17/ProductName.app/ProductName然后,如果您在Xcode构建设置中选择标准体系结构并再次构建和运行,则会产生以下结果:dyld:lazysymbolbindingFailed:Symbol

随机推荐

  1. Flutter 网络请求框架封装详解

    这篇文章主要介绍了Flutter 网络请求框架封装详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  2. Android单选按钮RadioButton的使用详解

    今天小编就为大家分享一篇关于Android单选按钮RadioButton的使用详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  3. 解决android studio 打包发现generate signed apk 消失不见问题

    这篇文章主要介绍了解决android studio 打包发现generate signed apk 消失不见问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

  4. Android 实现自定义圆形listview功能的实例代码

    这篇文章主要介绍了Android 实现自定义圆形listview功能的实例代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  5. 详解Android studio 动态fragment的用法

    这篇文章主要介绍了Android studio 动态fragment的用法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  6. Android用RecyclerView实现图标拖拽排序以及增删管理

    这篇文章主要介绍了Android用RecyclerView实现图标拖拽排序以及增删管理的方法,帮助大家更好的理解和学习使用Android,感兴趣的朋友可以了解下

  7. Android notifyDataSetChanged() 动态更新ListView案例详解

    这篇文章主要介绍了Android notifyDataSetChanged() 动态更新ListView案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下

  8. Android自定义View实现弹幕效果

    这篇文章主要为大家详细介绍了Android自定义View实现弹幕效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  9. Android自定义View实现跟随手指移动

    这篇文章主要为大家详细介绍了Android自定义View实现跟随手指移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. Android实现多点触摸操作

    这篇文章主要介绍了Android实现多点触摸操作,实现图片的放大、缩小和旋转等处理,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部