in device/transport/mqtt/src/mqtt_twin_client.ts [52:135]
constructor(client: MqttBase) {
super();
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_18_001: [The `MqttTwinClient` constructor shall accept a `client` object.]*/
this._mqtt = client;
const messageHandler = this._onMqttMessage.bind(this);
/*Codes_SRS_NODE_DEVICE_MQTT_TWIN_CLIENT_16_001: [The `MqttTwinClient` constructor shall immediately subscribe to the `message` event of the `client` object.]*/
this._mqtt.on('message', messageHandler);
this._topicFsm = new machina.BehavioralFsm({
initialState: 'unsubscribed',
states: {
unsubscribed: {
_onEnter: function (_topicSubscription: TopicSubscription, err: Error, callback: (err?: Error) => void): void {
if (callback) {
callback(err);
}
},
subscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
this.transition(topicSubscription, 'subscribing', callback);
},
unsubscribe: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
// not entirely sure about that. if subscription are restored because cleanSession is false, it means technically a user may want to unsubscribe
// even though subscribe hasn't been called yet.
callback();
}
},
subscribing: {
_onEnter: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
topicSubscription.mqttClient.subscribe(topicSubscription.topic, { qos: 0 }, (err, result) => {
if (err) {
this.transition(topicSubscription, 'unsubscribed', err, callback);
} else {
debug('subscribed to response topic: ' + JSON.stringify(result));
this.transition(topicSubscription, 'subscribed', callback);
}
});
},
'*': function (topicSubscription: TopicSubscription): void { this.deferUntilTransition(topicSubscription); }
},
subscribed: {
_onEnter: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
callback();
},
subscribe: function (_topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
callback();
},
unsubscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
this.transition(topicSubscription, 'unsubscribing', callback);
}
},
unsubscribing: {
_onEnter: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
topicSubscription.mqttClient.unsubscribe(topicSubscription.topic, (err, _result) => {
if (err) {
debugErrors('failed to unsubscribe: ' + err);
} else {
debug('unsubscribed from: ' + topicSubscription.topic);
}
this.transition(topicSubscription, 'unsubscribed', err, callback);
});
},
'*': function (topicSubscription: TopicSubscription): void { this.deferUntilTransition(topicSubscription); }
}
},
subscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
this.handle(topicSubscription, 'subscribe', callback);
},
unsubscribe: function (topicSubscription: TopicSubscription, callback: (err?: Error) => void): void {
this.handle(topicSubscription, 'unsubscribe', callback);
}
});
this._responseTopic = {
mqttClient: this._mqtt,
topic: responseTopic
};
this._desiredPropertiesUpdatesTopic = {
mqttClient: this._mqtt,
topic: desiredPropertiesUpdatesTopic
};
}