专业的编程技术博客社区

网站首页 > 博客文章 正文

Android集成MQTT客户端实践

baijin 2024-11-20 12:32:13 博客文章 2 ℃ 0 评论

现在物联网越来越深入我们的生活了,随处可见的穿戴设备、传感器,无时无刻不在采集着我们的信息。比如智能手环,我们运动的步数、心率等数据被小小的传感器时刻记录着。

一般来说,这些数据可以直接通过蓝牙等协议传输,但也有不少是通过其他途径,比如MQTT,设备采集的数据直接上传到云端的MQTT服务器上,而其他的终端如手机app则通过订阅获取数据,进行展示及数据分析等。

今天我们就来看看一个Android程序该如何集成MQTT client来与设备进行数据交互。

集成

首先引入依赖

implementation "org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0"
implementation "org.eclipse.paho:org.eclipse.paho.android.service:1.1.1"
implementation "androidx.localbroadcastmanager:localbroadcastmanager:1.0.0"

注意这里引入了"androidx.localbroadcastmanager",是因为我们项目使用了androidx,项目编译时报了"support.v4"包找不到的错误,而androidx中又不能直接引入"support.v4",网上找了一圈发现了这个靠谱的方法。当然,这还不够,还要在gradle.properties中增加下面一个配置项

android.enableJetifier=true

接下来要在AndroidManifest.xml中声明MqttService,如下

<service android:name="org.eclipse.paho.android.service.MqttService" />

这样mqtt的基本集成就算完成了。

使用

以下示例代码使用Kotlin语言,不过大家应该都能看懂。

首先我们需要一些配置项来接入MQTT服务器,多说一句,这个mqtt server可以直接用开源的自己部署,也可以直接买云厂商提供的。我们客户端集成需要的配置项如下

class Config {
    companion object {
        const val serverUri = "xxx"
        const val clientId = "xxx"
        const val instanceId = "xxx"
        const val accessKey = "xxx"
        const val secretKey = "xxx"
    }
}

这里歪哥项目中使用的是阿里云的mqtt server,所以接入配置像上面这样,其他的可能稍有不同,具体看官方给的文档就行。

然后就是具体的订阅、发布等操作了

class MqttSimple(var mqttAndroidClient: MqttAndroidClient) {

    val topicCommand = "xxx"
    val topicData = "xxx"

    // 订阅
    fun subscribe() {
        try {
            mqttAndroidClient.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {

                }

                override fun connectionLost(cause: Throwable) {
                    Log.e("mqtt", "connectionLost", cause)
                }

                @Throws(Exception::class)
                override fun messageArrived(topic: String, message: MqttMessage) {
                    val body = String(message.payload)
                    Log.w("mqtt", "messageArrived, topic: $topic, msg: $body")
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {}
            })

            val mqttConnectOptions = MqttConnectOptions()
            mqttConnectOptions.isAutomaticReconnect = true
            mqttConnectOptions.userName = "Signature|${Config.accessKey}|${Config.instanceId}"
            mqttConnectOptions.password =
                Tool.macSignature(Config.clientId, Config.secretKey).toCharArray()

            mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    Log.w("mqtt", "connect onSuccess")
                    subscribe(topicData)
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    Log.e("mqtt", "connect onFailure", exception)
                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
    
    // 发布(发送一条指令)
    fun sendCommand(cmd: String) {
        try {
            val mqttConnectOptions = MqttConnectOptions()
            mqttConnectOptions.isAutomaticReconnect = true
            mqttConnectOptions.userName = "Signature|${Config.accessKey}|${Config.instanceId}"
            mqttConnectOptions.password =
                Tool.macSignature(Config.clientId, Config.secretKey).toCharArray()

            mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    Log.w("mqtt", "connect onSuccess")
                    publish(cmd, topicCommand)
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    Log.e("mqtt", "connect onFailure", exception)
                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    private fun publish(msg: String, topic: String) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            mqttAndroidClient.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    Log.w("mqtt", "publish success, topic: $topic, msg: $msg")
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    Log.w("mqtt", "publish failed, topic: $topic, msg: $msg")
                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    private fun subscribe(topic: String) {
        try {
            val topicFilter = arrayOf(topic)
            val qos = intArrayOf(1)
            mqttAndroidClient.subscribe(topicFilter, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    Log.w("mqtt", "subscribe success, topic: $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    Log.e("mqtt", "subscribe failed", exception)
                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
}

这里有两个topic,topicCommand是数据发送的通道,我们可以以此向设备发动指令,topicData是数据订阅接收的通道。当然,示例中有可以优化的地方,比如不应该每次都重复连接server,大家实际使用时要注意些。

总结

MQTT类似于RocketMQ这种,一般是用做物联网场景的消息传输协议,现在这种物联网数据采集传输的场景还是很多的,特别是跟大模型的结合,已然是热门研究方向,大家还是有必要了解一下的。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表