IOT开发---Android MQTT使用
创始人
2024-06-02 10:13:22
0

目录

  • MQTT介绍
    • MQTT通信模型
    • MQTT客户端
    • MQTT服务端
  • Android使用MQTT
    • 集成MQTT库
    • 定义MQTT管理者
    • 定义消息实体

MQTT介绍

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。

该协议构建于TCP/IP协议上,它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。

MQTT通信模型

MQTT 协议提供一对多的消息发布,可以降低应用程序的耦合性,用户只需要编写极少量的应用代码就能完成一对多的消息发布与订阅,该协议是基于<客户端-服务器>模型,在协议中主要有三种身份:发布者(Publisher)、服务器(Broker)以及订阅者(Subscriber)。

其中,MQTT消息的发布者和订阅者都是客户端,服务器只是作为一个中转的存在,将发布者发布的消息进行转发给所有订阅该主题的订阅者;发布者可以发布在其权限之内的所有主题,并且消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦,发布的消息可以同时被多个订阅者订阅。

MQTT通信模型示意图如下:
在这里插入图片描述

MQTT客户端

MQTT 客户端可以向服务端发布信息,也可以从服务端收取信息。我们把客户端发送信息的行为称为 “发布”信息。客户端要想从服务端收取信息,则首先要向服务端“订阅”信息。

客户端具体功能如下:
1.发布消息给其它相关的客户端。
2.订阅主题请求接收相关的应用消息。
3.取消订阅主题请求移除接收应用消息。
4.从服务端终止连接。

MQTT服务端

MQTT 服务端通常是一台服务器(broker),它是 MQTT 信息传输的枢纽,负责将 MQTT 客户端发送来的信息传递给 MQTT 客户端。MQTT 服务端还负责管理 MQTT 客户端,以确保客户端之间的通讯顺畅,保证 MQTT 信息得以正确接收和准确投递。

MQTT 服务器位于消息发布者和订阅者之间,以便用于接收消息并发送到订阅者之中,它的功能有:
1.接受来自客户端的网络连接请求。
2.接受客户端发布的应用消息。
3.处理客户端的订阅和取消订阅请求。
4.转发应用消息给符合条件的已订阅客户端(包括发布者自身)。

Android使用MQTT

集成MQTT库

MQTT有不同语言、不同版本的诸多实现,其中Eclipse Paho只是诸多Java实现中的一个。

我们将使用Eclipse Paho Java Client作为客户端,它是 Java 语言中使用最广泛的 MQTT 客户端库。

集成步骤:

  1. 在Android工程的bulid.gradle(:app) 文件中添加依赖包
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

org.eclipse.paho也实现了一套针对Android端的通讯服务框架https://github.com/eclipse/paho.mqtt.android

不少开发者直接引用这个库:

implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’

在Android 8.0以前这样做是没问题的,8.0以后Android Service行为发生了很大变更,需要进行适配,不然会出现异常。但是这个库的维护人员貌似对Android版本适配不是很积极,鉴于此,我们把库的源码下载下来,对源码进行改造,作为一个库工程使用:

implementation project(':org.eclipse.paho.android.service')
  1. 添加权限

  1. 注册Service

定义MQTT管理者

该类设计为单例模式,实现MQTT初始化、连接、订阅消息、发布消息、处理消息、释放连接等操作。

初始化包括创建MqttAndroidClient对象,并设置回调接口,针对连接失败的情况作断线重连的尝试,针对接收的消息进行JSON解析、并重新封装成需要的数据内容,通过EventBus将消息对象抛出去,相关业务模块注册EventBus并接收该对应消息,然后进行处理。

连接主要是设置连接相关参数和针对连接结果的处理,参数有连接认证校验、设置超时时间、设置心跳包发送间隔、设置用户名和密码。

/*** Created by ZhangJun on 2019/1/3.*/
class MqttManager private constructor() {private var mqttAndroidClient: MqttAndroidClient? = nullprivate lateinit var mqttConnectOptions: MqttConnectOptionsprivate object MqttManagerHolder {val INSTANCE = MqttManager()}fun init(node: String, port: Int, clientId: String) {try {if (mqttAndroidClient == null) {mqttAndroidClient = MqttAndroidClient(XxApplication.instance, "ssl://$node:$port", clientId)} else {mqttAndroidClient!!.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String?) {LogUtils.d(TAG, "mqtt connectComplete reconnect = $reconnect")}override fun connectionLost(cause: Throwable?) {if (cause != null) {LogUtils.d(TAG, "mqtt connectionLost cause = " + cause.message)}connect()}@Throws(Exception::class)override fun messageArrived(topic: String, message: MqttMessage) {val str = String(message.payload)LogUtils.d(TAG, "messageArrived str = $str")val jsonObject = JSONObject(str)val event = jsonObject.optJSONObject("event")val header = event.optJSONObject("header")val namespace = header.optString("namespace")val name = header.optString("name")val payload = event.optJSONObject("payload")val message1 = MqttMessageBean()message1.messageId = namespace.plus(name)message1.messageContent = payloadEventBus.getDefault().post(ServerEvent.MqttMessageEvent(message1))}override fun deliveryComplete(token: IMqttDeliveryToken) {//do nothing}})mqttConnectOptions = MqttConnectOptions()mqttConnectOptions.socketFactory = sslSocketFactorymqttConnectOptions.isAutomaticReconnect = truemqttConnectOptions.isCleanSession = false// 设置超时时间,单位:秒mqttConnectOptions.connectionTimeout = 10// 心跳包发送间隔,单位:秒mqttConnectOptions.keepAliveInterval = 20// 用户名mqttConnectOptions.userName = CommonUtils.decryptToken()// 密码mqttConnectOptions.password = XxApplication.instance.packageName.toCharArray()connect()}} catch (ex: Exception) {ex.printStackTrace()}}private fun connect() {if (mqttAndroidClient != null && !mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.connect(mqttConnectOptions, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {val disconnectedBufferOptions = DisconnectedBufferOptions()disconnectedBufferOptions.bufferSize = 100disconnectedBufferOptions.isBufferEnabled = truedisconnectedBufferOptions.isPersistBuffer = falsedisconnectedBufferOptions.isDeleteOldestMessages = falsemqttAndroidClient!!.setBufferOpts(disconnectedBufferOptions)}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {LogUtils.d(TAG, " mqtt connect fail exception = " + exception.message)}})}}private val sslSocketFactory: SSLSocketFactoryget() {try {val sslContext = SSLContext.getInstance("SSL")sslContext.init(null, trustManager, SecureRandom())return sslContext.socketFactory} catch (e: Exception) {throw RuntimeException(e)}}private val trustManager: Arrayget() = arrayOf(object : X509TrustManager {override fun checkClientTrusted(chain: Array, authType: String) {//do nothing}override fun checkServerTrusted(chain: Array, authType: String) {//do nothing}override fun getAcceptedIssuers(): Array {return arrayOf()}})/*** 订阅消息*/fun subscribeTopic(subTopic: String, qos: Int) {try {if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.subscribe(subTopic, qos)}} catch (ex: MqttException) {System.err.println("Exception while subscribing")ex.printStackTrace()}}/*** 发布消息*/fun publishMessage(pubTopic: String, qos: Int, content: String) {try {if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.publish(pubTopic, content.toByteArray(), qos, false)}} catch (e: MqttException) {System.err.println("Error Publishing: " + e.message)e.printStackTrace()}}fun release() {try {if (mqttAndroidClient != null) {mqttAndroidClient!!.unregisterResources()if (mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.disconnect()}mqttAndroidClient!!.close()mqttAndroidClient = null}} catch (e: Exception) {e.printStackTrace()}}companion object {private val TAG = MqttManager::class.java.simpleNameval instance: MqttManagerget() = MqttManagerHolder.INSTANCE}
}

定义消息实体

/*** Created by ZhangJun on 2019/1/5.*/
class MqttMessageBean {var messageId: String = ""var messageContent: JSONObject = JSONObject()
}

相关内容

热门资讯

生活的快乐_叙事作文300字... 生活的快乐_叙事作文300字_三年级写事作文 篇一生活的快乐今天,我想和大家分享一件让我快乐的事情。...
三年级游植物园记作文350字... 三年级游植物园记作文350字6篇 篇一:难忘的植物园之旅今天,我们三年级的小朋友们去了植物园参观。一...
三年级作文大全新学期的变化(... 三年级作文大全新学期的变化 篇一新学期的变化新学期开始了,我迎来了充满变化的三年级生活。和以前相比,...
寒假二三事三年级作文【精选3... 寒假二三事三年级作文 篇一快乐的寒假时光今年的寒假真是过得太快了,转眼间就要开学了。回顾这个寒假,我...
三年级作文350字关于动物【... 三年级作文350字关于动物 篇一我喜欢的动物我喜欢的动物是小狗。小狗有着可爱的外形和活泼的性格,让人...
公园中一天景色的变化作文三年... 公园中一天景色的变化作文三年级 篇一公园中一天景色的变化早晨,当阳光洒在公园上,天空变得湛蓝,白云朵...
三年级作文我是爸爸妈妈小帮手... 三年级作文我是爸爸妈妈小帮手 篇一我是爸爸妈妈小帮手我是一个三年级的小学生,虽然我还小,但我一直都很...
写博物馆的作文三年级【通用6... 写博物馆的作文三年级 篇一博物馆是一个神奇的地方,我最喜欢去博物馆看展览了。上周末,我和爸爸妈妈一起...
三年级我的妈妈的作文300字... 三年级我的妈妈的作文300字 篇一我亲爱的妈妈妈妈,您是我最亲爱的人,也是我最敬爱的人。在我成长的道...
小学三年级写景作文(经典6篇... 小学三年级写景作文 篇一美丽的春天春天来了,大自然仿佛换了一幅崭新的画,到处都是一片生机勃勃的景象。...
我班的活雷锋三年级作文【通用... 我班的活雷锋三年级作文 篇一我们班的小雷锋在我们班级里,有一个特别的小朋友,他就是我们的活雷锋。他叫...
海边作文300字三年级(精选... 海边作文300字三年级 篇一我喜欢海边,因为海边有美丽的沙滩、清澈的海水和各种有趣的海洋生物。每年暑...
三年级作文上学的路上开头【实... 三年级作文上学的路上开头 篇一一大早,天空还是微微泛着蓝色的时候,我便背上了书包,踏上了上学的征程。...
不起眼的主角作文【优选3篇】 不起眼的主角作文 篇一《小草的故事》有一天,我在花园里散步,突然发现一株小草。它生长在花园的角落,显...
家乡的环境作文三年级300字... 家乡的环境作文三年级300字20篇 篇一标题:我家乡的美丽环境我家乡是一个美丽的小镇,它位于山脚下,...
爸爸的作文【优质6篇】 爸爸的作文 篇一爸爸的作文今天是我第一次写作文,我选择写一篇关于我的爸爸的作文。爸爸是我心目中最伟大...
参观洛阳周王城天子驾六博物馆... 参观洛阳周王城天子驾六博物馆三年级作文 篇一我去洛阳参观了周王城天子驾六博物馆,感受到了历史的厚重和...
关于老师的一件事200字作文... 关于老师的一件事200字作文三年级作文 篇一今天是我上三年级的第一天,我迫不及待地走进了教室,想要见...
香水百合(通用4篇) 香水百合 篇一:追寻芬芳之旅香水百合,一种散发着迷人芬芳的花朵,常常被人们用来制作香水。它的美丽和独...
我们是一家人学生作文500字... 我们是一家人学生作文500字 篇一我们是一家人家,是一个温暖的港湾,是一个永远的归宿。在这个家庭中,...