in libs/mqtt/mqtt.ts [556:641]
private handleMessage(data: Buffer) {
if (this.buf)
data = this.buf.concat(data)
this.buf = data
if (data.length < 2)
return
let len = data[1]
let payloadOff = 2
if (len & 0x80) {
if (data.length < 3)
return
if (data[2] & 0x80) {
this.emit('error', `too large packet.`);
this.buf = null
return
}
len = (data[2] << 7) | (len & 0x7f)
payloadOff++
}
const payloadEnd = payloadOff + len
if (data.length < payloadEnd)
return // wait for the rest of data
this.buf = null
const cmd = data[0]
const controlPacketType: ControlPacketType = cmd >> 4;
// this.emit('debug', `Rcvd: ${controlPacketType}: '${data}'.`);
const payload = data.slice(payloadOff, payloadEnd - payloadOff)
switch (controlPacketType) {
case ControlPacketType.ConnAck:
const returnCode: number = payload[1];
if (returnCode === ConnectReturnCode.Accepted) {
this.log('MQTT connection accepted.');
this.emit('connected');
this.status = Status.Connected;
this.piId = setInterval(() => this.ping(), Constants.PingInterval * 1000);
for (const sub of this.subs)
this.send1(sub);
} else {
const connectionError: string = Client.describe(returnCode);
this.log('MQTT connection error: ' + connectionError);
this.emit('error', connectionError);
this.disconnect()
}
break;
case ControlPacketType.Publish:
const message: IMessage = Protocol.parsePublish(cmd, payload);
this.trace(`incoming: ${message.topic}`)
let handled = false
let cleanup = false
if (this.mqttHandlers) {
for (let h of this.mqttHandlers)
if (message.topic.slice(0, h.topic.length) == h.topic) {
h.handler(message)
handled = true
if (h.status == HandlerStatus.Once) {
h.status = HandlerStatus.ToRemove
cleanup = true
}
}
if (cleanup)
this.mqttHandlers = this.mqttHandlers.filter(h => h.status != HandlerStatus.ToRemove)
}
if (!handled)
this.emit('receive', message);
if (message.qos > 0) {
setTimeout(() => {
this.send1(Protocol.createPubAck(message.pid || 0));
}, 0);
}
break;
case ControlPacketType.PingResp:
case ControlPacketType.PubAck:
case ControlPacketType.SubAck:
break;
default:
this.emit('error', `MQTT unexpected packet type: ${controlPacketType}.`);
}
if (data.length > payloadEnd)
this.handleMessage(data.slice(payloadEnd))
}