MQTT**(**消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在TCP/IP協(xié)議族上,是為硬件性能低下的遠程設備以及網(wǎng)絡狀況糟糕的情況下而設計的發(fā)布/訂閱型消息協(xié)議,為此,它需要一個消息中間件。
一、前景摘要
- DES版本:DevEco Studio 3.0 Release
- SDK版本:3.2.2.5 ( API9)
- npm版本:6.14.16
- EMQX:linux(Ubuntu)
- MQTTX:Version: v1.9.2
二、了解MQTT
1、什么是MQTT?
MQTT**(**消息隊列遙測傳輸)是ISO 標準(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在TCP/IP協(xié)議族上,是為硬件性能低下的遠程設備以及網(wǎng)絡狀況糟糕的情況下而設計的發(fā)布/訂閱型消息協(xié)議,為此,它需要一個消息中間件。
2、MQTT特性
MQTT協(xié)議是為大量計算能力有限,且工作在低帶寬、不可靠的網(wǎng)絡的遠程傳感器和控制設備通訊而設計的協(xié)議,它具有以下主要的幾項特性:
使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應用程序耦合;
- 對負載內容屏蔽的消息傳輸。
- 使用 TCP/IP 提供網(wǎng)絡連接。
- 有三種消息發(fā)布服務質量:QoS(定閱等級),分0、1、2三個等級,簡單來說是等級越高越可靠。
- 小型傳輸,開銷很?。ü潭ㄩL度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡流量。
- 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
3、應用場景(多客戶端,少量消息)
- 車聯(lián)網(wǎng)
- 工業(yè)互聯(lián)網(wǎng)
- 智能家居
- 視頻直播彈幕
- IM實時聊天(一對一聊天,群組聊天)
- 推送服務,比如推送實時新聞
- 金融交易數(shù)據(jù)訂閱推送
4、基本名詞
MQTT協(xié)議中的三種身份
- MQTT Broker:代理服務器
- Publish:發(fā)布者,發(fā)布消息
- Subscribe:訂閱者,訂閱消息
MQTT傳輸?shù)南?/h4>
主題(Topic),負載(payload)和QoS
- Topic:消息的類型,訂閱者訂閱后就會收到該主題的消息內容(payload).
- Payload:消息的內容,指訂閱者具體要使用的內容。
- QoS:服務質量.
“至多一次”(QoS0):消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡。會發(fā)生消息丟失或重復。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。即是推送之后就完事了,至于對方有沒有收到,收到是什么,數(shù)據(jù)有沒有丟失,都不管。
“至少一次”(QoS1):確保消息到達,但消息重復可能會發(fā)生。即是你收到推送后,你還得返回一個puback給對方,告訴對方收到了,不然對方會以為你沒收到,隔一段時間后重新給你推送,直到你給對方返回一個Puback為止。
“只有一次”(QoS2):確保消息到達一次。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復或丟失會導致不正確的結果。
異常中斷的機制
使用 Last Will 和 Testament 特性通知有關各方客戶端。
- Last Will:即遺言機制,用于通知同一主題下的其他設備發(fā)送遺言的設備已經(jīng)斷開了連接。
- Testament:遺言機制,功能類似于Last Will。
三、MQTT的簡單使用
1、搭建MQTT服務器EMQX
MQTT服務器有很多種,且部署方式也不一樣。
Linux (Ubuntu)
安裝命令:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx :安裝
sudo systemctl start emqx :啟動
下載鏈接:https://www.emqx.io/zh/downloads?os=Ubuntu
Docker:https://emqx.io/zh/downloads?os=Docker
Linux(centos):https://www.emqx.io/zh/downloads?os=CentOS
windows:https://www.emqx.io/zh/downloads?os=Windows
測試:確保emqx已正常運行后,可在Linux本地瀏覽器中輸入: http://127.0.0.1:18083。
注意:未登錄需修改密碼,兩次保持一致。
局域網(wǎng)其他電腦訪問須知Linux的IP地址。
2、MQTTX
創(chuàng)建連接
創(chuàng)建subscription
收發(fā)消息
#代表通配符,代表訂閱所有該類型的topic。
3、在OpenHarmony使用MQTT
安裝依賴
依賴地址:https://gitee.com/openharmony-tpc/ohos_mqtt。
有所不同的是我用的npm安裝的依賴npm install @ohos/mqtt。
創(chuàng)建Mqtt客戶端
創(chuàng)建mqtt客戶端并建立連接:
const clientOptions: MqttClientOptions = {
url: '192.168.xxx.xxx/:1883',
clientId: 'client_id_' + new Date().getTime(),
persistenceType: 1,
}
const connectOptions: MqttConnectOptions = {
userName: '',
password: '',
connectTimeout: 30,
}
this.mqClient = MqttAsync.createMqtt(this.clientOptions);
this.mqClient.connect(this.connectOptions, (data: MqttResponse) => {
console.log(TAG+" data: "+JSON.stringify(data));
if (data.code == 0) {
this.messageArrived();
this.subscribe('主設備號/#');
}
});
監(jiān)聽
//接收消息,使用此接口后,當訂閱的主題有消息發(fā)布時,會自動接收到消息。
public messageArrived(): void {
this.mqClient.messageArrived((err, data) => {
console.log(TAG+"messageArrived!!!!!!!!!!!");
console.log(TAG+"messageArrived data:"+JSON.stringify(data));
});
}
訂閱消息
// 訂閱消息
public subscribe(topic: string, qos: QoS = 1): void {
const subscribeOption: MqttSubscribeOptions = { topic, qos };
this.mqClient.subscribe(subscribeOption, (err, data)=>{
this.handleMessage(data)
});
}
發(fā)布消息
// 發(fā)布消息
public publish<T>(topic: string, payload: string | Record<string, any>, qos: QoS = 0): void {
if (typeof payload !== 'string') {
payload = JSON.stringify(payload)
}
const payloadLen = payload.length;
const publishOption: MqttPublishOptions = { topic, payload, qos, payloadLen };
console.log(TAG, 'publishOption data: ' + JSON.stringify(publishOption));
this.mqClient.publish(publishOption, (err, data)=>{
console.log(TAG+"publish!!!!!!!!!!!");
console.log(TAG+"publish data:"+JSON.stringify(data));
}));
}
根據(jù)訂閱的消息的主題的不同進行不同的處理
handleMessage(data:any){
console.log(TAG+"subscribe!!!!!!!!!!!");
console.log(TAG+"subscribe data:"+JSON.stringify(data));
//根據(jù)data的不同進行不同的處理
}
四、擴展
mqtt與MQ中間件的關系。
消息中間件是基于隊列與消息傳遞技術,在網(wǎng)絡環(huán)境中為應用系統(tǒng)提供同步或異步、可靠的消息傳輸?shù)闹涡攒浖到y(tǒng)。
MQTT 與消息隊列有一定的區(qū)別,隊列是一種先進先出的數(shù)據(jù)結構,消息隊列常用于應用服務層面,實現(xiàn)參考如 RabbitMQ Kafka RocketMQ。
MQTT 是傳輸協(xié)議,絕大部分 MQTT Broker 不保證消息順序(Queue),常用語物聯(lián)網(wǎng)、消息傳輸?shù)取?/p>