private handleMessage()

in libs/mqtt/mqtt.ts [556:641]


        private handleMessage(data: Buffer) {
            if (this.buf)
                data = this.buf.concat(data)
            this.buf = data
            if (data.length < 2)
                return
            let len = data[1]
            let payloadOff = 2
            if (len & 0x80) {
                if (data.length < 3)
                    return
                if (data[2] & 0x80) {
                    this.emit('error', `too large packet.`);
                    this.buf = null
                    return
                }
                len = (data[2] << 7) | (len & 0x7f)
                payloadOff++
            }

            const payloadEnd = payloadOff + len
            if (data.length < payloadEnd)
                return // wait for the rest of data

            this.buf = null

            const cmd = data[0]
            const controlPacketType: ControlPacketType = cmd >> 4;
            // this.emit('debug', `Rcvd: ${controlPacketType}: '${data}'.`);

            const payload = data.slice(payloadOff, payloadEnd - payloadOff)

            switch (controlPacketType) {
                case ControlPacketType.ConnAck:
                    const returnCode: number = payload[1];
                    if (returnCode === ConnectReturnCode.Accepted) {
                        this.log('MQTT connection accepted.');
                        this.emit('connected');
                        this.status = Status.Connected;
                        this.piId = setInterval(() => this.ping(), Constants.PingInterval * 1000);
                        for (const sub of this.subs)
                            this.send1(sub);
                    } else {
                        const connectionError: string = Client.describe(returnCode);
                        this.log('MQTT connection error: ' + connectionError);
                        this.emit('error', connectionError);
                        this.disconnect()
                    }
                    break;
                case ControlPacketType.Publish:
                    const message: IMessage = Protocol.parsePublish(cmd, payload);
                    this.trace(`incoming: ${message.topic}`)
                    let handled = false
                    let cleanup = false
                    if (this.mqttHandlers) {
                        for (let h of this.mqttHandlers)
                            if (message.topic.slice(0, h.topic.length) == h.topic) {
                                h.handler(message)
                                handled = true
                                if (h.status == HandlerStatus.Once) {
                                    h.status = HandlerStatus.ToRemove
                                    cleanup = true
                                }
                            }
                        if (cleanup)
                            this.mqttHandlers = this.mqttHandlers.filter(h => h.status != HandlerStatus.ToRemove)
                    }
                    if (!handled)
                        this.emit('receive', message);
                    if (message.qos > 0) {
                        setTimeout(() => {
                            this.send1(Protocol.createPubAck(message.pid || 0));
                        }, 0);
                    }
                    break;
                case ControlPacketType.PingResp:
                case ControlPacketType.PubAck:
                case ControlPacketType.SubAck:
                    break;
                default:
                    this.emit('error', `MQTT unexpected packet type: ${controlPacketType}.`);
            }

            if (data.length > payloadEnd)
                this.handleMessage(data.slice(payloadEnd))
        }