in src/common-amqp/amqp.ts [129:694]
constructor(autoSettleMessages: boolean) {
// node-amqp10 has an automatic reconnection/link re-attach feature that is enabled by default.
// In our case we want to control the reconnection flow ourselves, so we need to disable it.
/*Codes_SRS_NODE_COMMON_AMQP_16_042: [The Amqp constructor shall create a new `rhea.Client` instance and configure it to:
- not reconnect on failure
- not reattach sender and receiver links on failure]*/
this._rheaContainer = rheaCreateContainer();
this._rheaContainer.on('azure-iot-amqp-base:error-indicated', (err: AmqpError) => {
debugErrors('azure-iot-amqp-base:error-indicated invoked ' + getErrorName(err));
this._fsm.handle('amqpError', err);
});
const rheaErrorHandler = (context: EventContext) => {
debugErrors(`rhea error event handler: error = ${context.error}`);
this._fsm.handle('error', context);
};
const connectionErrorHandler = (context: EventContext) => {
debugErrors(`connection error event handler: error = ${context.error}`);
this._fsm.handle('connection_error', context);
};
const connectionCloseHandler = (context: EventContext) => {
debug('connection close event handler');
this._fsm.handle('connection_close', context);
};
const connectionOpenHandler = (context: EventContext) => {
debug('connection open event handler');
this._fsm.handle('connection_open', context);
};
const connectionDisconnectedHandler = (context: EventContext) => {
if (context.error) {
debugErrors(`connection disconnected event handler: error = ${context.error}`);
} else {
debug('connection disconnected event handler');
}
this._fsm.handle('disconnected', context);
};
const manageConnectionHandlers = (operation: string) => {
this._rheaConnection[operation]('connection_error', connectionErrorHandler);
this._rheaConnection[operation]('connection_open', connectionOpenHandler);
this._rheaConnection[operation]('connection_close', connectionCloseHandler);
this._rheaConnection[operation]('disconnected', connectionDisconnectedHandler);
this._rheaConnection[operation]('error', rheaErrorHandler);
};
const sessionErrorHandler = (context: EventContext) => {
debugErrors(`session error event handler: error = ${getErrorName(context.error)}`);
this._fsm.handle('session_error', context);
};
const sessionOpenHandler = (context: EventContext) => {
debug('session open event handler');
this._fsm.handle('session_open', context);
};
const sessionCloseHandler = (context: EventContext) => {
debug('session close event handler');
this._fsm.handle('session_close', context);
};
const manageSessionHandlers = (operation: string) => {
this._rheaSession[operation]('session_error', sessionErrorHandler);
this._rheaSession[operation]('session_open', sessionOpenHandler);
this._rheaSession[operation]('session_close', sessionCloseHandler);
};
this._fsm = new machina.Fsm({
namespace: 'amqp-base',
initialState: 'disconnected',
states: {
disconnected: {
_onEnter: (disconnectCallback, err, _result) => {
if (disconnectCallback) {
if (err) {
this._safeCallback(disconnectCallback, err);
} else {
this._safeCallback(disconnectCallback, null, new results.Disconnected());
}
} else if (this._disconnectHandler) {
debug('calling upper layer disconnect handler');
debugErrors('error passed to disconnect handler is: ' + getErrorName(err || new errors.NotConnectedError('rhea: connection disconnected')));
this._disconnectHandler(err || new errors.NotConnectedError('rhea: connection disconnected'));
}
},
amqpError: (err) => {
debugErrors('received an error while disconnected: maybe a bug: ' + (err ? err.name : 'falsy error object.'));
},
connect: (connectionParameters, connectCallback) => {
this._fsm.transition('connecting', connectionParameters, connectCallback);
},
disconnect: (callback) => callback(null, new results.Disconnected()),
attachSenderLink: (_endpoint, _linkOptions, callback) => callback(new errors.NotConnectedError()),
attachReceiverLink: (_endpoint, _linkOptions, callback) => callback(new errors.NotConnectedError()),
detachSenderLink: (_endpoint, callback) => this._safeCallback(callback),
detachReceiverLink: (_endpoint, callback) => this._safeCallback(callback),
initializeCBS: (callback) => callback(new errors.NotConnectedError()),
putToken: (_audience, _token, callback) => callback(new errors.NotConnectedError()),
},
connecting: {
_onEnter: (connectionParameters, connectCallback) => {
this._rheaContainer.options.sender_options = {
properties: {
'com.microsoft:client-version': this._config.userAgentString
},
reconnect: false
};
this._rheaContainer.options.receiver_options = {
properties: {
'com.microsoft:client-version': this._config.userAgentString
},
reconnect: false,
autoaccept: autoSettleMessages
};
this._connectionCallback = connectCallback;
this._indicatedConnectionError = undefined;
this._disconnectionOccurred = false;
this._sessionCloseOccurred = false;
this._connectionCloseOccurred = false;
//
// According to the rhea maintainers, one can depend on that fact that no actual network activity
// will occur until the nextTick() after the call to connect. Because of that, one can
// put the event handlers on the rhea connection object returned from the connect call and be assured
// that the listeners are in place BEFORE any possible events will be emitted on the connection.
//
this._rheaConnection = this._rheaContainer.connect(connectionParameters);
manageConnectionHandlers('on');
},
connection_open: (context: EventContext) => {
this._rheaConnection = context.connection;
const callback = this._connectionCallback;
this._connectionCallback = undefined;
this._fsm.transition('connecting_session', callback);
},
connection_close: (_context: EventContext) => {
const err = this._indicatedConnectionError;
const callback = this._connectionCallback;
this._indicatedConnectionError = undefined;
this._connectionCallback = undefined;
this._connectionCloseOccurred = true;
manageConnectionHandlers('removeListener');
this._fsm.transition('disconnected', callback, err);
},
connection_error: (context: EventContext) => {
this._indicatedConnectionError = context.connection.error as AmqpError;
},
error: (context: EventContext) => {
const callback = this._connectionCallback;
this._connectionCallback = undefined;
manageConnectionHandlers('removeListener');
this._fsm.transition('disconnected', callback, context.connection.error);
},
disconnected: (_context: EventContext) => {
const callback = this._connectionCallback;
this._connectionCallback = undefined;
manageConnectionHandlers('removeListener');
this._fsm.transition('disconnected', callback, new errors.NotConnectedError('rhea: connection disconnected'));
},
'*': () => this._fsm.deferUntilTransition()
},
connecting_session: {
_onEnter: (connectCallback, result) => {
this._sessionCallback = connectCallback;
this._sessionResult = result;
//
// According to the rhea maintainers, one can depend on that fact that no actual network activity
// will occur until the nextTick() after the call to create_session. Because of that, one can
// put the event handlers on the rhea session object returned from the create_session call and be assured
// that the listeners are in place BEFORE any possible events will be emitted on the session.
//
this._rheaSession = this._rheaConnection.create_session();
manageSessionHandlers('on');
this._rheaSession.open();
},
session_open: (_context: EventContext) => {
const callback = this._sessionCallback;
const result = this._sessionResult;
this._sessionCallback = undefined;
this._sessionResult = undefined;
this._fsm.transition('connected', callback, result);
},
session_error: (context: EventContext) => {
this._indicatedSessionError = context.session.error;
},
session_close: (_context: EventContext) => {
const err = this._indicatedSessionError;
const callback = this._sessionCallback;
this._indicatedSessionError = undefined;
this._sessionCallback = undefined;
this._sessionCloseOccurred = true;
this._fsm.transition('disconnecting', callback, err);
},
connection_error: (context: EventContext) => {
this._indicatedConnectionError = context.connection.error;
},
connection_close: (_context: EventContext) => {
const err = this._indicatedConnectionError;
const callback = this._sessionCallback;
//
// We lie about session close coming in. Thing is that if we are here we don't actually have
// a good session set up. This way we won't wait around for a session end that probably won't come.
//
this._sessionCloseOccurred = true;
this._indicatedConnectionError = undefined;
this._sessionCallback = undefined;
this._connectionCloseOccurred = true;
this._fsm.transition('disconnecting', callback, err);
},
error: (context: EventContext) => {
const callback = this._sessionCallback;
//
// We lie about session close coming in. Thing is that if we are here we don't actually have
// a good session set up. This way we won't wait around for a session end that probably won't come.
//
this._sessionCloseOccurred = true;
this._sessionCallback = undefined;
this._fsm.transition('disconnecting', callback, context.connection.error);
},
disconnected: (_context: EventContext) => {
const callback = this._sessionCallback;
this._sessionCallback = undefined;
manageConnectionHandlers('removeListener');
this._fsm.transition('disconnected', callback, new errors.NotConnectedError('rhea: connection disconnected'));
},
amqpError: (err) => {
this._fsm.transition('disconnecting', null, err);
},
'*': () => this._fsm.deferUntilTransition()
},
connected: {
_onEnter: (connectCallback, result) => {
/*Codes_SRS_NODE_COMMON_AMQP_16_002: [The `connect` method shall establish a connection with the IoT hub instance and if given as argument call the `done` callback with a null error object in the case of success and a `results.Connected` object.]*/
this._safeCallback(connectCallback, null, new results.Connected(result));
},
session_error: (context: EventContext) => {
this._indicatedSessionError = context.session.error;
},
session_close: (_context: EventContext) => {
const err = this._indicatedSessionError;
this._indicatedSessionError = undefined;
this._sessionCloseOccurred = true;
this._fsm.transition('disconnecting', null, err);
},
connection_error: (context: EventContext) => {
this._indicatedConnectionError = context.connection.error;
},
connection_close: (_context: EventContext) => {
const err = this._indicatedConnectionError;
this._indicatedConnectionError = undefined;
this._connectionCloseOccurred = true;
this._fsm.transition('disconnecting', null, err);
},
error: (context: EventContext) => {
if (context && context.connection) {
this._fsm.transition('disconnecting', null, context.connection.error);
} else {
this._fsm.transition('disconnecting', null, context);
}
},
disconnected: (_context: EventContext) => {
this._disconnectionOccurred = true;
this._fsm.transition('disconnecting', null, new errors.NotConnectedError('rhea: connection disconnected'));
},
amqpError: (err) => {
this._fsm.transition('disconnecting', null, err);
},
connect: (_policyOverride, callback) => callback(null, new results.Connected()),
disconnect: (disconnectCallback) => {
this._fsm.transition('disconnecting', disconnectCallback);
},
initializeCBS: (callback) => {
this._cbs = new ClaimsBasedSecurityAgent(this._rheaSession);
this._cbs.attach(callback);
},
putToken: (audience, token, callback) => {
if (!this._cbs) {
this._fsm.handle('initializeCBS', (err) => {
if (err) {
callback(err);
} else {
this._fsm.handle('putToken', audience, token, callback);
}
});
} else {
this._cbs.putToken(audience, token, callback);
}
},
send: (message: Message, endpoint: string, to: string, done: GenericAmqpBaseCallback<any>): void => {
/*Codes_SRS_NODE_COMMON_AMQP_16_006: [The `send` method shall construct an AMQP message using information supplied by the caller, as follows:
The ‘to’ field of the message should be set to the ‘to’ argument.
The ‘body’ of the message should be built using the message argument.] */
debug('call to deprecated api \'azure-iot-amqp-base.Amqp.send\'. You should be using SenderLink.send instead');
const amqpMessage = AmqpMessage.fromMessage(message);
if (to !== undefined) {
amqpMessage.to = to;
}
if (!this._senders[endpoint]) {
this._fsm.handle('attachSenderLink', endpoint, null, (err) => {
if (err) {
debugErrors('failed to attach the sender link: ' + getErrorName(err));
done(err);
} else {
(this._senders[endpoint] as SenderLink).send(amqpMessage, done);
}
});
} else {
(this._senders[endpoint] as SenderLink).send(amqpMessage, done);
}
},
getReceiver: (endpoint: string, done: GenericAmqpBaseCallback<ReceiverLink>): void => {
/*Codes_SRS_NODE_COMMON_AMQP_16_010: [If a receiver for this endpoint doesn’t exist, the getReceiver method should create a new AmqpReceiver object and then call the done() method with the object that was just created as an argument.] */
if (!this._receivers[endpoint]) {
this._fsm.handle('attachReceiverLink', endpoint, null, (err) => {
if (err) {
done(err);
} else {
done(null, this._receivers[endpoint]);
}
});
} else {
/*Codes_SRS_NODE_COMMON_AMQP_16_009: [If a receiver for this endpoint has already been created, the getReceiver method should call the done() method with the existing instance as an argument.] */
done(null, this._receivers[endpoint]);
}
},
attachReceiverLink: (endpoint: string, linkOptions: any, done: GenericAmqpBaseCallback<ReceiverLink>): void => {
debug('creating receiver link for: ' + endpoint);
this._receivers[endpoint] = new ReceiverLink(endpoint, linkOptions, this._rheaSession);
const permanentErrorHandler = (_err) => {
delete(this._receivers[endpoint]);
};
const operationErrorHandler = (err) => {
done(err);
};
this._receivers[endpoint].on('error', permanentErrorHandler);
this._receivers[endpoint].on('error', operationErrorHandler);
/*Codes_SRS_NODE_COMMON_AMQP_06_006: [The `attachReceiverLink` method shall call `attach` on the `ReceiverLink` object.] */
this._receivers[endpoint].attach((err) => {
if (err) {
permanentErrorHandler(err);
operationErrorHandler(err);
} else {
this._receivers[endpoint].removeListener('error', operationErrorHandler);
debug('receiver link attached: ' + endpoint);
done(null, this._receivers[endpoint]);
}
});
},
attachSenderLink: (endpoint: string, linkOptions: any, done: GenericAmqpBaseCallback<any>): void => {
debug('creating sender link for: ' + endpoint);
this._senders[endpoint] = new SenderLink(endpoint, linkOptions, this._rheaSession);
const permanentErrorHandler = (_err) => {
delete(this._senders[endpoint]);
};
const operationErrorHandler = (err) => {
done(err);
};
this._senders[endpoint].on('error', permanentErrorHandler);
this._senders[endpoint].on('error', operationErrorHandler);
/*Codes_SRS_NODE_COMMON_AMQP_06_005: [The `attachSenderLink` method shall call `attach` on the `SenderLink` object.] */
this._senders[endpoint].attach((err) => {
if (err) {
permanentErrorHandler(err);
operationErrorHandler(err);
} else {
this._senders[endpoint].removeListener('error', operationErrorHandler);
done(null, this._senders[endpoint]);
}
});
},
detachReceiverLink: (endpoint: string, detachCallback: GenericAmqpBaseCallback<any>): void => {
if (!this._receivers[endpoint]) {
this._safeCallback(detachCallback);
} else {
this._detachLink(this._receivers[endpoint], (err?) => {
delete(this._receivers[endpoint]);
this._safeCallback(detachCallback, err);
});
}
},
detachSenderLink: (endpoint: string, detachCallback: GenericAmqpBaseCallback<any>): void => {
this._detachLink(this._senders[endpoint], (err?) => {
delete(this._senders[endpoint]);
this._safeCallback(detachCallback, err);
});
}
},
disconnecting: {
_onEnter: (disconnectCallback, err) => {
debug('Entering disconnecting state with disconnectCallback: ' + disconnectCallback + ' error of: ' + getErrorName(err));
const sessionEnd = (callback) => {
//
// If a disconnection has already happened then there is no point in trying to send a session close.
// Just be done.
//
if (this._disconnectionOccurred) {
callback();
} else {
//
// A session close may have already been received from the peer. If we are disconnecting because of a session error as an example.
// We should send a session close from our end BUT, we should not expect to receive back another session close in response.
//
this._rheaSession.close();
if (this._sessionCloseOccurred) {
callback();
} else {
this._sessionCallback = callback; // So that the session_close handler for this disconnecting state can invoke the callback.
}
}
};
const disconnect = (callback) => {
debug('entering disconnect function of disconnecting state');
if (err) {
debugErrors('with a disconnecting state err: ' + getErrorName(err));
}
//
// If a disconnection has already occurred there is no point in generating any network traffic.
//
if (this._disconnectionOccurred) {
debug('in disconnecting state - a disconnect had already been detected. No point in doing anything.');
callback(err);
} else {
//
// A connection close may have already been received from the peer. If we are disconnecting because of a connection error as an example.
// We should send a connection close from our end BUT, we should not expect to receive back another connection close in response.
//
debug('disconnect in disconnecting state is about send a close to the peer.');
this._rheaConnection.close();
if (this._connectionCloseOccurred) {
callback(err);
} else {
this._connectionCallback = callback;
}
}
};
const detachLink = (link, callback) => {
if (!link) {
return callback();
}
if (this._sessionCloseOccurred || this._disconnectionOccurred) {
debugErrors('forceDetaching link: ' + link);
link.forceDetach(err);
callback();
} else {
debug('cleanly detaching link: ' + link);
link.detach(callback, err);
}
};
const remainingLinks = [];
for (const senderEndpoint in this._senders) {
if (Object.prototype.hasOwnProperty.call(this._senders, senderEndpoint)) {
remainingLinks.push(this._senders[senderEndpoint]);
delete this._senders[senderEndpoint];
}
}
for (const receiverEndpoint in this._receivers) {
if (Object.prototype.hasOwnProperty.call(this._receivers, receiverEndpoint)) {
remainingLinks.push(this._receivers[receiverEndpoint]);
delete this._receivers[receiverEndpoint];
}
}
//
// Depending on the mode of failure it is possible that no network activity is occurring.
// However, the SDK *might* not have noticed that yet.
//
// If this happens then rundown code will simply stall waiting for a response which will never
// occur.
//
// To deal with this, we encapsulate the rundown code in a function and we invoke it.
//
// First, however, we will spin off a timer to execute the very same rundown code in 45 seconds, but that
// function invocation will use API that do NOT expect a reply.
//
// If the initial function invocation actually doesn't stall due to lack of communication, it's
// last action will be to clear the timeout and consequently the timeout invocation call to the
// rundown code will NOT occur.
//
let rerunWithForceTimer: any = undefined;
const rundownConnection = (force: boolean) => {
//
// Note that _disconnectionOccurred could already be true on the first run (the NON setTimer invocation).
// Thus, we don't simply want to set it to the value of force.
//
if (force) {
this._disconnectionOccurred = true;
}
detachLink(this._cbs, () => {
/*Codes_SRS_NODE_COMMON_AMQP_16_034: [The `disconnect` method shall detach all open links before disconnecting the underlying AMQP client.]*/
async.each(remainingLinks, detachLink, () => {
sessionEnd((sessionError) => {
disconnect((disconnectError) => {
clearTimeout(rerunWithForceTimer);
manageConnectionHandlers('removeListener');
const finalError = err || sessionError || disconnectError;
this._fsm.transition('disconnected', disconnectCallback, finalError);
});
});
});
});
};
/*Codes_SRS_NODE_COMMON_AMQP_06_007: [While disconnecting, if the run down does not complete within 45 seconds, the code will be re-run with `forceDetach`es.]*/
rerunWithForceTimer = setTimeout(() => {
debugErrors('the normal rundown expired without completion. Do it again with force detaches.');
rundownConnection(true);
}, 45000);
rundownConnection(false);
},
session_close: (_context: EventContext) => {
const err = this._indicatedSessionError;
this._indicatedSessionError = undefined;
const callback = this._sessionCallback;
this._sessionCallback = undefined;
this._sessionCloseOccurred = true;
if (callback) {
callback(err);
}
},
session_error: (context: EventContext) => {
this._indicatedSessionError = context.session.error;
},
connection_error: (context: EventContext) => {
this._indicatedConnectionError = context.connection.error;
},
connection_close: (_context: EventContext) => {
const err = this._indicatedConnectionError;
const callback = this._connectionCallback;
this._indicatedConnectionError = undefined;
this._connectionCloseOccurred = true;
/*Codes_SRS_NODE_COMMON_AMQP_16_004: [The disconnect method shall call the done callback when the application/service has been successfully disconnected from the service] */
if (callback) {
callback(err);
}
},
error: (_context: EventContext) => {
debugErrors('ignoring error events while disconnecting');
},
disconnected: (_context: EventContext) => {
this._disconnectionOccurred = true;
},
amqpError: (err) => {
debugErrors('ignoring error event while disconnecting: ' + getErrorName(err));
},
'*': () => this._fsm.deferUntilTransition('disconnected')
}
}
});
this._fsm.on('transition', (transition) => {
debug(transition.fromState + ' -> ' + transition.toState + ' (action:' + transition.action + ')');
});
}