常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。 前者很常见,MQTT是什么呢?MQTT属于IoT也就是物联网的概念。
常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。MQTT是什么呢?
Kafka,RocketMQ和RabbitMQ属于微服务间的mq,而MQTT则属于IoT也就是物联网的概念。
mqtt.js是MQTT在nodejs端的实现。vue技术栈下的前端也可用。
mqtt.js官方为微信小程序和支付宝小程序也做了支持。微信小程序的MQTT协议名为 wxs ,支付宝小程序则是 alis 。
如果还是一脸懵逼,那么就跟随我通过mqtt.js去认识一下这个物联网领域的宠儿吧。
- 什么是微消息队列?
- MQTT关键名词解释
- P2P消息和Pub/Sub消息
- 封装的mqtt.js通用class
- 客户端发包函数sendPacket
- 客户端连接 mqtt. connect ()
- 订阅topic mqtt.Client#subscribe()
- 发送消息 mqtt.Client#publish()
- 接收消息 mqtt.Client#“ message ”事件
什么是微消息队列?
消息队列一般分为两种:
- 微服务消息队列(微服务间信息传递,典型代表有RabbitMQ,Kafka,RocketMQ)
- 物联网消息队列(物联网端与云端消息传递,代表有MQTT)
目前我实践过的,也就是我们本篇博文深入分析的,是物联网消息队列的mqtt.js。
传统的消息队列(微服务间信息传递)
传统的微服务间(多个子系统服务端间)消息队列是一种非常常见的服务端间消息传递的方式。
典型代表有:RabbitMQ,Kafka,RocketMQ。
阿里云 官网拥有AMQP(兼容RabbitMQ),Kafka,和RocketMQ这三种微服务消息队列,对于我们今后在实际项目中落地提供了很大的帮助。
更多微服务消息队列可查看: node-mq-tutorial
使用场景多种多样:
- 高并发:秒杀、抢票(FIFO)
- 共享型:积分兑换(多子系统共用积分模块)
- 通信型:服务端间消息传递(nodejs,java,python,go等等)
MQTT消息队列(物联网端与云间消息传递)
MQTT是一个物联网MQTT协议,主要解决的是物联网IoT网络情况复杂的问题。
阿里云有MQTT消息队列服务。通信协议支持MQTT,STOMP,GB-808等。数据传输层支持TCP长连接、SSL加密、Websocket等。
使用场景主要为数据传输:
- 车联网(远程控制,汽车数据上传)
- IM通讯(1对1单聊,1对多朋友圈)
- 视频直播(弹幕通知,聊天互动)
- 智能家居(电器数据上传,遥控指令)
目前我手上负责的运行了2年的聊天系统就是使用的这个服务,我们主要按照 设备<->server<->PC 的方式, MQTT协议,Websocket传输协议 进行设备与PC间的数据通信。
MQTT关键名词解释
实例(Instance)
每个MQTT实例都对应一个全局唯一的服务接入点。
肉眼可见的区别就是在通过 mqtt.connect(url) 与server(broker)建立连接时,broker的url都是一致的。
假设有saleman1,salesman2···他们本地的前端与服务端间建立连接的url都是统一的,只是在clientId进行区分即可。
客户端Id(Client ID)
MQTT的Client ID是每个客户端的唯一标识,要求全局都是唯一的,使用同一个Client ID连接会被拒绝。
阿里云的ClientID由两部分组成 <GroupID>@@@<DeviceID> 。
通常情况下Group ID是多前端统一的,比如PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。
那么如何区分多端呢?可以对Client ID中间的@@@做修改。
比如:
let CID_PC = `<GroupID>@@@-PC<DeviceID>` | |
let CID_Android = `<GroupID>@@@-Android<DeviceID>` | |
let CID_IOS = `<GroupID>@@@-IOS<DeviceID>` |
组Id(Group ID)
用于指定一组逻辑功能完全一致的节点公用的组名,代表的是一类相同功能的设备。
Device ID
每个设备独一无二的标识。这个需要保证全局唯一,可以是每个传感器设备的序列号,可以是登录PC的userId。
父主题(Parent Topic)
MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。
Topic可以存在多级,第一级为父级Topic。
需要控制台单独创建。
子主题(Subtopic)
MQTT可以有二级Topic,也可以有三级Topic。
无需创建,代码中直接写即可。
P2P消息和Pub/Sub消息
Pub/Sub消息就是订阅和发布的模式,类似事件监听和广播。
如果对发布订阅不理解,可以去看Webhook到底是个啥?
MQTT除了支持Pub/Sub的模式,还支持P2P的模式。
什么是P2P消息?
- P2P,全称为(Point to Point)。
- 一对一的消息收发模式,只有一个消息发送者和一个消息接收者。
- P2P模式下,消息发送者明确知道消息的预期接收者,并且这个消息只能被这个特定的 客户端消费 。
- 发送者发送消息时,通过Topic指定接收者,接收者无需订阅即可获得该消息。
- P2P 模式不仅降低注册订阅的成本,而且因为对链路有优化,所以降低推送延迟。
P2P模式和Pub/Sub模式的区别
发送消息时
- Pub/Sub模式下,发送者需要按照与接受者约定好的Topic发送消息
- P2P模式下,发送者无需按照Tpic发送,可以直接按照规范进行发送
接收消息时
- Pub/Sub模式下,接收者需要提前订阅topic才能接消息
- P2P模式下无需订阅即可接收消息
nodejs发送P2P消息
const ppTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001"; | |
mqtt.client.publish(ppTopic); |
封装的mqtt.js通用class
- 客户端连接 initClient( config )
- 订阅topic subscribeTopic(topic, config)
- 发送消息 publishMessage(message)
- 接收消息 handleMessage(callback)
import mqtt from 'mqtt'; | |
import config from '@/config'; | |
export default class MQTT { | |
constructor(options) { | |
this.name = options.name; | |
this.connecting = false; | |
} | |
/** | |
* 客户端连接 | |
*/ initClient(config) { | |
const { url, groupId, key, password, topic: { publish: publishTopic }} = config; | |
return new Promise((resolve) => { | |
this.client = mqtt.connect( | |
{ | |
url, | |
clientId: `${groupId}@@@${deviceId}`, | |
username: key, | |
password, | |
} | |
); | |
this.client.on('connect', () => { | |
this.connecting = true; | |
resolve(this); | |
}); | |
}); | |
} | |
/** | |
* 订阅topic | |
*/ subscribeTopic(topic, config) { | |
if (this.connecting) { | |
this.client.subscribe(topic, config); | |
} | |
return this; | |
} | |
/** | |
* 发送消息 | |
*/ publishMessage(message) { | |
this.client.publish(publishTopic, message, { qos: }); | |
} | |
/** | |
* 接收消息 | |
*/ handleMessage(callback) { | |
if (!this.client._events.message) { | |
this.client.on('message', callback); | |
} | |
} | |
} |
客户端发包函数sendPacket
mqtt-packet生成一个可传输buffer
var mqtt = require('mqtt-packet') | |
var object = { | |
cmd: 'publish', | |
retain: false, | |
qos:, | |
dup: false, | |
length:, | |
topic: 'test', | |
payload: 'test' // Can also be a Buffer | |
} | |
var opts = { protocolVersion: } // default is 4. Usually, opts is a connect packet | |
console.log(mqtt.generate(object)) | |
// Prints: | |
// | |
// <Buffer 0a 00 04 74 65 73 74 74 65 73 74> | |
// | |
// Which is the same as: | |
// | |
// new Buffer([ | |
//, 10, // Header (publish) | |
//, 4, // Topic length | |
//, 101, 115, 116, // Topic (test) | |
//, 101, 115, 116 // Payload (test) | |
// ]) |
sendPacket函数
发出packetsend事件并且通过mqtt.writeToStream将packet写入client的stream中。
var mqttPacket = require('mqtt-packet') | |
function sendPacket (client, packet) { | |
client.emit('packetsend', packet) | |
mqttPacket.writeToStream(packet, client.stream, client.options) | |
} |
_sendPack方法
MqttClient. prototype ._sendPacket = function (packet) { | |
sendPacket(this, packet); | |
} |
客户端连接 mqtt.connect()
mqtt client建立与mqtt server(broker)的连接,通常是通过给定一个’mqtt’, ‘mqtts’, ‘tcp’, ‘tls’, ‘ws’, ‘wss’, ‘wxs’ , ‘alis’为协议的url进行连接。
mqtt.connect([url], options)
官方说明:
- 通过给定的url和配置连接到一个broker,并且返回一个Client。
- url可以遵循以下协议:’mqtt’, ‘mqtts’, ‘tcp’, ‘tls’, ‘ws’, ‘wss’, ‘wxs’ , ‘alis’。( mqtt.js支持微信小程序和支付宝小程序,协议分别为wxs和alis。 )
- url也可以是通过URL.parse()返回的对象。
- 可以传入一个单对象,既包含url又包含选项。
再来看一下我手上项目的连接配置,连接结果。
敏感信息已通过foo,bar,baz或者xxxx的组合进行数据脱敏处理。
连接配置
{ | |
key: 'xxxxxxxx', | |
secret: 'xxxxxxxx', | |
url: 'wss://foo-bar.mqtt.baz.com/mqtt', | |
groupId: 'FOO_BAR_BAZ_GID', | |
topic: { | |
publish: 'PUBLISH_TOPIC', | |
subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/pp'], | |
unsubscribe: 'PUBLISH_TOPIC/noticeMobile/', | |
}, | |
} |
- key 账号
- secret 密码
- url 用于建立client与server(broker)mqtt连接的链接
- groupId 组id
- topic 发送消息的topic,订阅的topic,取消订阅的topic
连接结果
包括总览,响应头和请求头。
General
Request URL: wss://foo-bar.mqtt.baz.com | |
Request Method: GET | |
Status Code: Switching Protocols |
Response Header
HTTP/.1 101 Switching Protocols | |
upgrade: websocket | |
connection: upgrade | |
sec-websocket-accept: xxxxxxx | |
sec-websocket-protocol: mqtt |
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/.1 | |
Host: foo-bar.mqtt.baz.com | |
Connection: Upgrade | |
Pragma: no-cache | |
Cache-Control: no-cache | |
User-Agent: Mozilla/.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36 | |
Upgrade: websocket | |
Origin: | |
Sec-WebSocket-Version: | |
Accept-Encoding: gzip, deflate, br | |
Accept-Language: zh-CN,zh;q=.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6 | |
Sec-WebSocket-Key: xxxxxxxxx | |
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits | |
Sec-WebSocket-Protocol: mqtt |
源码分析
下面来看这段mqtt连接的代码。
this.client = mqtt.connect( | |
{ | |
url, | |
clientId: `${groupId}@@@${deviceId}`, | |
username: key, | |
password, | |
} | |
); | |
function parseAuthOptions (opts) { | |
var matches | |
if (opts.auth) { | |
matches = opts.auth.match(/^(.+):(.+)$/) | |
if (matches) { | |
opts.username = matches[] | |
opts.password = matches[] | |
} else { | |
opts.username = opts.auth | |
} | |
} | |
} | |
/** | |
* connect - connect to an MQTT broker. | |
* | |
* @param {String} [brokerUrl] - url of the broker, optional | |
* @param {Object} opts - see MqttClient#constructor | |
*/function connect (brokerUrl, opts) { | |
if ((typeof brokerUrl === 'object') && !opts) { | |
// 可以传入一个单对象,既包含url又包含选项 | |
opts = brokerUrl | |
brokerUrl = null | |
} | |
opts = opts || {} | |
// 设置username和password | |
parseAuthOptions(opts) | |
if (opts.query && typeof opts.query.clientId === 'string') { | |
// 设置Client Id | |
opts.clientId = opts.query.clientId | |
} | |
function wrapper (client) { | |
... | |
return protocols[opts.protocol](client, opts) | |
} | |
// 最终返回一个mqtt client实例 | |
return new MqttClient(wrapper, opts) | |
} |
订阅topic mqtt.Client#subscribe()
实际代码
const topic = { | |
subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/pp'], | |
unsubscribe: 'PUBLISH_TOPIC/noticeMobile/', | |
}; | |
const config = { qos: }; | |
this.client.subscribe(topic.subscribe, config) |
源码分析
MqttClient.prototype.subscribe = function () { | |
var packet | |
var args = new Array(arguments.length) | |
for (var i =; i < arguments.length; i++) { | |
args[i] = arguments[i] | |
} | |
var subs = [] | |
// obj为订阅的topic列表 | |
var obj = args.shift() | |
// qos等配置 | |
var opts = args.pop() | |
var defaultOpts = { | |
qos: | |
} | |
opts = xtend(defaultOpts, opts) | |
// 数组类型的订阅的topic列表 | |
if (Array.isArray(obj)) { | |
obj.forEach(function (topic) { | |
if (!that._resubscribeTopics.hasOwnProperty(topic) || | |
that._resubscribeTopics[topic].qos < opts.qos || | |
resubscribe) { | |
var currentOpts = { | |
topic: topic, | |
qos: opts.qos | |
} | |
// subs是最终的订阅的topic列表 | |
subs.push(currentOpts) | |
} | |
}) | |
} | |
// 这个packet很重要 | |
packet = { | |
// 发出订阅命令 | |
cmd: 'subscribe', | |
subscriptions: subs, | |
qos:, | |
retain: false, | |
dup: false, | |
messageId: this._nextId() | |
} | |
// 发出订阅包 | |
this._sendPacket(packet) | |
return this | |
} |
发送消息 mqtt.Client#publish()
实际代码
const topic = { | |
publish: 'PUBLISH_TOPIC', | |
}; | |
const messge = { | |
foo: '', | |
bar: '', | |
baz: '', | |
... | |
} | |
const msgStr = JSON.stringify(message); | |
this.client.publish(topic.publish, msgStr); |
注意publish的消息需要使用JSON.stringify进行序列化,然后再发到指定的topic。
源码分析
MqttClient.prototype.publish = function (topic, message, opts, callback) { | |
var packet | |
var options = this.options | |
var defaultOpts = {qos:, retain: false, dup: false} | |
opts = xtend(defaultOpts, opts) | |
// 将消息传入packet的payload | |
packet = { | |
cmd: 'publish', | |
topic: topic, | |
payload: message, | |
qos: opts.qos, | |
retain: opts.retain, | |
messageId: this._nextId(), | |
dup: opts.dup | |
} | |
// 处理不同qos | |
switch (opts.qos) { | |
case: | |
case: | |
// 发出publish packet | |
this._sendPacketI(packet); | |
... | |
default: | |
this._sendPacket(packet); | |
... | |
} | |
return this | |
} |
接收消息 mqtt.Client “message”事件
实际代码
this.client.on('message', callback);
数据以callback的方式接收。
function (topic, message, packet) {}
topic代表接收到的topic,buffer则是具体的数据。
message是接收到的数据,谨记通过JSON.parse()对buffer做解析。
handleMessage(callback) { | |
this.client.on('message', callback); | |
} | |
this.client.handleMessage((topic, buffer) => { | |
let receiveMsg = null; | |
try { | |
receiveMsg = JSON.parse(buffer.toString()); | |
} catch (e) { | |
receiveMsg = null; | |
} | |
if (!receiveMsg) { | |
return; | |
} | |
...do something with receiveMsg... | |
}); |
源码分析
MqttClient继承了EventEmitter。
从而进行可以使用on监听“message”事件。
inherits(MqttClient, EventEmitter)
那么到底是在哪里间发出message事件的呢?>emit the message event
- 基于websocket-stream建立websocket连接
- 使用pipe连接基于readable-stream.Writable创建的可写流
- nextTick调用_handlePacket
- 在handlePacket中调用handlePublish发出message事件
1.基于websocket-stream建立websocket连接
this.stream = this.streamBuilder(this) | |
function streamBuilder (client, opts) { | |
return createWebSocket(client, opts) | |
} | |
var websocket = require('websocket-stream') | |
function createWebSocket (client, opts) { | |
var websocketSubProtocol = | |
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion ===) | |
? 'mqttv.1' | |
: 'mqtt' | |
setDefaultOpts(opts) | |
var url = buildUrl(opts, client) | |
return websocket(url, [websocketSubProtocol], opts.wsOptions) | |
} |
2. 使用pipe连接基于readable-stream.Writable创建的可写流
var Writable = require('readable-stream').Writable | |
var writable = new Writable(); | |
this.stream.pipe(writable); |
3.nextTick调用_handlePacket
writable._write = function (buf, enc, done) { | |
completeParse = done | |
parser.parse(buf) | |
work() | |
} | |
function work () { | |
var packet = packets.shift() | |
if (packet) { | |
that._handlePacket(packet, nextTickWork) | |
} | |
} | |
function nextTickWork () { | |
if (packets.length) { | |
process.nextTick(work) | |
} else { | |
var done = completeParse | |
completeParse = null | |
done() | |
} | |
} |
4. 在handlePacket中调用handlePublish发出message事件
MqttClient.prototype._handlePacket = function (packet, done) { | |
switch (packet.cmd) { | |
case 'publish': | |
this._handlePublish(packet, done) | |
break | |
... | |
} | |
// emit the message event | |
MqttClient.prototype._handlePublish = function (packet, done) { | |
switch (qos) { | |
case: { | |
// emit the message event | |
if (!code) { that.emit('message', topic, message, packet) } | |
} | |
} |