IOT开发---Android MQTT使用
创始人
2024-06-02 10:13:22
0

目录

  • MQTT介绍
    • MQTT通信模型
    • MQTT客户端
    • MQTT服务端
  • Android使用MQTT
    • 集成MQTT库
    • 定义MQTT管理者
    • 定义消息实体

MQTT介绍

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。

该协议构建于TCP/IP协议上,它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。

MQTT通信模型

MQTT 协议提供一对多的消息发布,可以降低应用程序的耦合性,用户只需要编写极少量的应用代码就能完成一对多的消息发布与订阅,该协议是基于<客户端-服务器>模型,在协议中主要有三种身份:发布者(Publisher)、服务器(Broker)以及订阅者(Subscriber)。

其中,MQTT消息的发布者和订阅者都是客户端,服务器只是作为一个中转的存在,将发布者发布的消息进行转发给所有订阅该主题的订阅者;发布者可以发布在其权限之内的所有主题,并且消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦,发布的消息可以同时被多个订阅者订阅。

MQTT通信模型示意图如下:
在这里插入图片描述

MQTT客户端

MQTT 客户端可以向服务端发布信息,也可以从服务端收取信息。我们把客户端发送信息的行为称为 “发布”信息。客户端要想从服务端收取信息,则首先要向服务端“订阅”信息。

客户端具体功能如下:
1.发布消息给其它相关的客户端。
2.订阅主题请求接收相关的应用消息。
3.取消订阅主题请求移除接收应用消息。
4.从服务端终止连接。

MQTT服务端

MQTT 服务端通常是一台服务器(broker),它是 MQTT 信息传输的枢纽,负责将 MQTT 客户端发送来的信息传递给 MQTT 客户端。MQTT 服务端还负责管理 MQTT 客户端,以确保客户端之间的通讯顺畅,保证 MQTT 信息得以正确接收和准确投递。

MQTT 服务器位于消息发布者和订阅者之间,以便用于接收消息并发送到订阅者之中,它的功能有:
1.接受来自客户端的网络连接请求。
2.接受客户端发布的应用消息。
3.处理客户端的订阅和取消订阅请求。
4.转发应用消息给符合条件的已订阅客户端(包括发布者自身)。

Android使用MQTT

集成MQTT库

MQTT有不同语言、不同版本的诸多实现,其中Eclipse Paho只是诸多Java实现中的一个。

我们将使用Eclipse Paho Java Client作为客户端,它是 Java 语言中使用最广泛的 MQTT 客户端库。

集成步骤:

  1. 在Android工程的bulid.gradle(:app) 文件中添加依赖包
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'

org.eclipse.paho也实现了一套针对Android端的通讯服务框架https://github.com/eclipse/paho.mqtt.android

不少开发者直接引用这个库:

implementation ‘org.eclipse.paho:org.eclipse.paho.android.service:1.1.1’

在Android 8.0以前这样做是没问题的,8.0以后Android Service行为发生了很大变更,需要进行适配,不然会出现异常。但是这个库的维护人员貌似对Android版本适配不是很积极,鉴于此,我们把库的源码下载下来,对源码进行改造,作为一个库工程使用:

implementation project(':org.eclipse.paho.android.service')
  1. 添加权限

  1. 注册Service

定义MQTT管理者

该类设计为单例模式,实现MQTT初始化、连接、订阅消息、发布消息、处理消息、释放连接等操作。

初始化包括创建MqttAndroidClient对象,并设置回调接口,针对连接失败的情况作断线重连的尝试,针对接收的消息进行JSON解析、并重新封装成需要的数据内容,通过EventBus将消息对象抛出去,相关业务模块注册EventBus并接收该对应消息,然后进行处理。

连接主要是设置连接相关参数和针对连接结果的处理,参数有连接认证校验、设置超时时间、设置心跳包发送间隔、设置用户名和密码。

/*** Created by ZhangJun on 2019/1/3.*/
class MqttManager private constructor() {private var mqttAndroidClient: MqttAndroidClient? = nullprivate lateinit var mqttConnectOptions: MqttConnectOptionsprivate object MqttManagerHolder {val INSTANCE = MqttManager()}fun init(node: String, port: Int, clientId: String) {try {if (mqttAndroidClient == null) {mqttAndroidClient = MqttAndroidClient(XxApplication.instance, "ssl://$node:$port", clientId)} else {mqttAndroidClient!!.setCallback(object : MqttCallbackExtended {override fun connectComplete(reconnect: Boolean, serverURI: String?) {LogUtils.d(TAG, "mqtt connectComplete reconnect = $reconnect")}override fun connectionLost(cause: Throwable?) {if (cause != null) {LogUtils.d(TAG, "mqtt connectionLost cause = " + cause.message)}connect()}@Throws(Exception::class)override fun messageArrived(topic: String, message: MqttMessage) {val str = String(message.payload)LogUtils.d(TAG, "messageArrived str = $str")val jsonObject = JSONObject(str)val event = jsonObject.optJSONObject("event")val header = event.optJSONObject("header")val namespace = header.optString("namespace")val name = header.optString("name")val payload = event.optJSONObject("payload")val message1 = MqttMessageBean()message1.messageId = namespace.plus(name)message1.messageContent = payloadEventBus.getDefault().post(ServerEvent.MqttMessageEvent(message1))}override fun deliveryComplete(token: IMqttDeliveryToken) {//do nothing}})mqttConnectOptions = MqttConnectOptions()mqttConnectOptions.socketFactory = sslSocketFactorymqttConnectOptions.isAutomaticReconnect = truemqttConnectOptions.isCleanSession = false// 设置超时时间,单位:秒mqttConnectOptions.connectionTimeout = 10// 心跳包发送间隔,单位:秒mqttConnectOptions.keepAliveInterval = 20// 用户名mqttConnectOptions.userName = CommonUtils.decryptToken()// 密码mqttConnectOptions.password = XxApplication.instance.packageName.toCharArray()connect()}} catch (ex: Exception) {ex.printStackTrace()}}private fun connect() {if (mqttAndroidClient != null && !mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.connect(mqttConnectOptions, null, object : IMqttActionListener {override fun onSuccess(asyncActionToken: IMqttToken) {val disconnectedBufferOptions = DisconnectedBufferOptions()disconnectedBufferOptions.bufferSize = 100disconnectedBufferOptions.isBufferEnabled = truedisconnectedBufferOptions.isPersistBuffer = falsedisconnectedBufferOptions.isDeleteOldestMessages = falsemqttAndroidClient!!.setBufferOpts(disconnectedBufferOptions)}override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {LogUtils.d(TAG, " mqtt connect fail exception = " + exception.message)}})}}private val sslSocketFactory: SSLSocketFactoryget() {try {val sslContext = SSLContext.getInstance("SSL")sslContext.init(null, trustManager, SecureRandom())return sslContext.socketFactory} catch (e: Exception) {throw RuntimeException(e)}}private val trustManager: Arrayget() = arrayOf(object : X509TrustManager {override fun checkClientTrusted(chain: Array, authType: String) {//do nothing}override fun checkServerTrusted(chain: Array, authType: String) {//do nothing}override fun getAcceptedIssuers(): Array {return arrayOf()}})/*** 订阅消息*/fun subscribeTopic(subTopic: String, qos: Int) {try {if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.subscribe(subTopic, qos)}} catch (ex: MqttException) {System.err.println("Exception while subscribing")ex.printStackTrace()}}/*** 发布消息*/fun publishMessage(pubTopic: String, qos: Int, content: String) {try {if (mqttAndroidClient != null && mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.publish(pubTopic, content.toByteArray(), qos, false)}} catch (e: MqttException) {System.err.println("Error Publishing: " + e.message)e.printStackTrace()}}fun release() {try {if (mqttAndroidClient != null) {mqttAndroidClient!!.unregisterResources()if (mqttAndroidClient!!.isConnected) {mqttAndroidClient!!.disconnect()}mqttAndroidClient!!.close()mqttAndroidClient = null}} catch (e: Exception) {e.printStackTrace()}}companion object {private val TAG = MqttManager::class.java.simpleNameval instance: MqttManagerget() = MqttManagerHolder.INSTANCE}
}

定义消息实体

/*** Created by ZhangJun on 2019/1/5.*/
class MqttMessageBean {var messageId: String = ""var messageContent: JSONObject = JSONObject()
}

相关内容

热门资讯

初中记叙文:那双紧握的手【推... 初中记叙文:那双紧握的手 篇一我记得那是一个寒冷的冬日,我和妈妈一起去参加一个社区活动。活动结束后,...
父亲初一作文【精简6篇】 父亲初一作文 篇一:我和爸爸的足球之旅爸爸是我的英雄,他是一位充满活力和热情的足球迷。每当有重要的足...
初一作文我的爱好800字记叙... 初一作文我的爱好800字记叙文 第一篇大家好!我是一个非常活泼可爱,并且爱好广泛的小女孩。我的业余爱...
最底层的人初一作文【推荐5篇... 最底层的人初一作文 篇一最底层的人初一作文我是一个来自最底层的人,我是一个农村的孩子。在我心中,最底...
新学期的一天初一作文【推荐3... 新学期的一天初一作文 篇一初一的新学期,阳光明媚,充满了希望和憧憬。我早早地起床,洗漱完毕,穿上整洁...
妈妈初一作文(精彩6篇) 妈妈初一作文 篇一:妈妈是我的英雄妈妈是我的英雄。她是一个充满爱心和勇气的人,她总是尽力为我们创造一...
教我如何不想她作文600字(... 教我如何不想她作文600字 篇一教我如何不想她我们常常会遇到一些让我们难以忘怀的人,尤其是在感情的世...
我不再什么作文600字初一通... 我不再什么作文600字初一 第一篇泥土到处都有,可当你不再闻到泥土的芬芳时,才会觉得它宝贵;植物随处...
七年级语文月考1(经典3篇) 七年级语文月考1 篇一:我眼中的好老师作为一名七年级学生,我曾经遇到过很多老师。有些老师严厉,有些老...
你快乐就好-初中作文【优质5... 你快乐就好-初中作文 篇一快乐是一种美妙的情绪,它能够让人心情愉悦、精神焕发。而我认为,一个人的快乐...
初一我收获了友谊作文700字... 初一我收获了友谊作文700字 第一篇面,风很大,天气阴沉沉的。“怦怦怦!怦怦怦!”“1、2、3、4…...
初一暑假一件事作文500字(... 初一暑假一件事作文500字 篇一初一暑假,我参加了一次短期夏令营活动。这是我第一次参加夏令营,我非常...
初中英语人称代词语法【经典3... 初中英语人称代词语法 篇一人称代词在英语语法中扮演着重要的角色。它们用来代替名词,并且根据人称的不同...
初一记忆中的暖流作文(优选6... 初一记忆中的暖流作文 篇一初一是我人生中一个重要的阶段,那段时间充满了回忆和暖流。初一的生活虽然紧张...
包装无悔生命初一作文(精选5... 包装无悔生命初一作文 篇一包装无悔生命生命是一场旅程,每个人都在这个旅程中扮演着不同的角色,承载着不...
初一满分写景作文共50篇 初一满分写景作文 第一篇时间真快,转眼间我就初一了,整整一个暑假都没有看见过母校的美景了。真是“归来...
月亮抒情作文范文初一推荐90... 月亮抒情作文范文初一 第一篇又到了一年一度的中秋节,我很高兴,因为我喜欢赏月,喜欢听中秋的美丽传说,...
青春风采初中作文(优秀5篇) 青春风采初中作文 篇一:追逐梦想的青春青春是一段美好的时光,是我们追逐梦想的时刻。初中时期,正是我们...
中秋奇趣初中作文(精简5篇) 中秋奇趣初中作文 篇一中秋佳节,是中国传统的重要节日之一。在这一天,人们会与家人团聚,品尝美食,赏月...
七年级我来了作文700字推荐... 七年级我来了作文700字 第一篇经过一个暑假的放松,我终于走进了初中校园的大门。满怀着激动与兴奋,我...