in common/transport/mqtt/src/mqtt_base.ts [152:326]
constructor(mqttProvider?: any) {
super();
this.mqttProvider = mqttProvider ? mqttProvider : require('mqtt');
this._onTheWirePublishes = new OnTheWireMessageContainer();
this._fsm = new machina.Fsm({
namespace: 'mqtt-base',
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (callback, err) => {
debug('In MQTT base FSM - entered onEnter for disconnect');
//
// The semantics of this _onEnter for the disconnected state (which is the initial state)
// is that we got here from another one of the states of this FSM.
//
// So there was a disconnection.
//
// If there are any outstanding publishes, fail them. We will never see
// their acknowledgements (PUBACK). It is important to acknowledge that
// the publishes that were "on the wire", might indeed make it to the peer. We'll
// never know. If the code further up the stack retries, we could indeed get
// duplication of published data. Nothing we can really do about it.
//
this._onTheWirePublishes.purge(err);
//
// One of the other states was able to pass along a callback. Use it to finish up whatever
// operation the state machine was working on.
//
// If there is no callback present, the clear implication is that something pretty major occurred,
// NOT in the context of any particular operation. There is NO operation that this error can be reported
// as a result for. Hence we emit the 'error' event.
//
if (callback) {
callback(err);
} else {
if (err) {
debugErrors('In mqtt base - no callback for error - emitting \'error\': ' + this._errorDescription(err));
this.emit('error', err);
}
}
},
connect: (callback) => this._fsm.transition('connecting', callback),
disconnect: (callback) => callback(),
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_020: [The `publish` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
publish: (_topic, _payload, _options, callback) => callback(new errors.NotConnectedError()),
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_026: [The `subscribe` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
subscribe: (_topic, _options, callback) => callback(new errors.NotConnectedError()),
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_027: [The `unsubscribe` method shall call the callback with a `NotConnectedError` if the connection hasn't been established prior to calling `publish`.]*/
unsubscribe: (_topic, callback) => callback(new errors.NotConnectedError()),
updateSharedAccessSignature: (callback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_034: [The `updateSharedAccessSignature` method shall not trigger any network activity if the mqtt client is not connected.]*/
debug('updating shared access signature while disconnected');
callback();
}
},
connecting: {
_onEnter: (connectCallback) => {
this._connectClient((err, connack) => {
if (err) {
this._fsm.transition('disconnecting', connectCallback, err);
} else {
this._mqttTrackedListeners.addTrackedListener('error', this._errorCallback.bind(this));
this._fsm.transition('connected', connectCallback, connack);
}
});
},
disconnect: (callback) => {
this._fsm.transition('disconnecting', callback);
},
'*': () => this._fsm.deferUntilTransition()
},
connected: {
_onEnter: (connectCallback, connack) => {
this._mqttTrackedListeners.addTrackedListener('close', this._closeCallback.bind(this));
connectCallback(null, new results.Connected(connack));
},
connect: (callback) => callback(null, new results.Connected()),
disconnect: (callback) => this._fsm.transition('disconnecting', callback),
publish: (topic, payload, options, callback) => {
const thisPublishIdentifier = this._onTheWirePublishes.provideIdentifier();
this._onTheWirePublishes.add(thisPublishIdentifier, new OnTheWireMessage(callback));
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_017: [The `publish` method publishes a `payload` on a `topic` using `options`.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_021: [The `publish` method shall call `publish` on the mqtt client object and call the `callback` argument with `null` and the `puback` object if it succeeds.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_022: [The `publish` method shall call the `callback` argument with an Error if the operation fails.]*/
this._mqttClient.publish(topic, payload, options, (err, result) => {
this._onTheWirePublishes.complete(thisPublishIdentifier, err, result);
});
},
subscribe: (topic, options, callback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_12_008: [The `subscribe` method shall call `subscribe` on MQTT.JS library and pass it the `topic` and `options` arguments.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_024: [The `subscribe` method shall call the callback with `null` and the `suback` object if the mqtt library successfully subscribes to the `topic`.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_025: [The `subscribe` method shall call the callback with an `Error` if the mqtt library fails to subscribe to the `topic`.]*/
this._mqttClient.subscribe(topic, options, callback);
},
unsubscribe: (topic, callback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_028: [The `unsubscribe` method shall call `unsubscribe` on the mqtt library and pass it the `topic`.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_029: [The `unsubscribe` method shall call the `callback` argument with no arguments if the operation succeeds.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_030: [The `unsubscribe` method shall call the `callback` argument with an `Error` if the operation fails.]*/
this._mqttClient.unsubscribe(topic, callback);
},
updateSharedAccessSignature: (callback) => {
this._fsm.transition('reconnecting', callback);
},
closeEvent: () => {
this._fsm.transition('disconnected', undefined, new errors.NotConnectedError('Connection to the server has been closed.'));
}
},
disconnecting: {
_onEnter: (disconnectCallback, err) => {
this._disconnectClient(!!err, () => {
this._fsm.transition('disconnected', disconnectCallback, err);
});
},
'*': () => this._fsm.deferUntilTransition()
},
reconnecting: {
_onEnter: (callback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_033: [The `updateSharedAccessSignature` method shall disconnect and reconnect the mqtt client with the new `sharedAccessSignature`.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_035: [The `updateSharedAccessSignature` method shall call the `callback` argument with no parameters if the operation succeeds.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_036: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails.]*/
let switched = false;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_41_002: [The `updateSharedAccessSignature` method shall trigger a forced disconnect if after 30 seconds the mqtt client has failed to complete a non-forced disconnect.]*/
/*Codes_SRS_NODE_COMMON_MQTT_BASE_41_003: [The `updateSharedAccessSignature` method shall call the `callback` argument with an `Error` if the operation fails after timing out.]*/
const disconnectTimeout = setTimeout(() => {
debugErrors('disconnecting mqtt client timed out. Force disconnecting.');
switched = true;
this._fsm.handle('forceDisconnect', callback);
}, 30000);
// When experiencing message loss due to high throughput, the force flag in
// disconnectClient can be set to true to cause messages to be properly dropped,
// and thus re-sent. Making this the default is under investigation.
debug('disconnecting mqtt client');
this._disconnectClient(false, () => {
clearTimeout(disconnectTimeout);
if (!switched) {
debug('mqtt client disconnected - reconnecting');
this._connectClient((err, connack) => {
if (err) {
debugErrors('failed to reconnect the client: ' + err.toString());
this._fsm.transition('disconnected', callback, err);
} else {
debug('mqtt client reconnected successfully');
this._mqttTrackedListeners.addTrackedListener('error', this._errorCallback.bind(this));
this._fsm.transition('connected', callback, connack);
}
});
}
});
},
forceDisconnect: (callback) => {
debug('force disconnecting mqtt client');
this._disconnectClient(true, () => {
debug('mqtt client disconnected - reconnecting');
this._connectClient((err, connack) => {
if (err) {
debugErrors('failed to reconnect the client: ' + err);
this._fsm.transition('disconnected', callback, err);
} else {
debug('mqtt client reconnected successfully');
this._fsm.transition('connected', callback, connack);
}
});
});
},
'*': () => this._fsm.deferUntilTransition()
}
}
});
this._fsm.on('transition', (data) => {
debug(data.fromState + ' -> ' + data.toState + ' (' + data.action + ')');
});
}