in common/transport/amqp/src/receiver_link.ts [78:395]
constructor(linkAddress: string, linkOptions: ReceiverOptions, session: Session) {
super();
this._linkAddress = linkAddress;
this._rheaSession = session;
this._undisposedDeliveries = [];
this._combinedOptions = {
source: linkAddress
};
if (linkOptions) {
for (const k in linkOptions) {
this._combinedOptions[k] = linkOptions[k];
}
}
const receiverOpenHandler = (context: EventContext): void => {
this._fsm.handle('receiverOpenEvent', context);
};
const receiverCloseHandler = (context: EventContext): void => {
this._fsm.handle('receiverCloseEvent', context);
};
const receiverErrorHandler = (context: EventContext): void => {
this._fsm.handle('receiverErrorEvent', context);
};
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_012: [If a `message` event is emitted by the `rhea` link object, the `ReceiverLink` object shall emit a `message` event with the same content.]*/
const receiverMessageHandler = (context: EventContext): void => {
this._undisposedDeliveries.push({ msg: context.message as any, delivery: context.delivery });
this.emit('message', context.message);
};
const manageReceiverHandlers = (operation: string) => {
this._rheaReceiver[operation]('receiver_error', receiverErrorHandler);
this._rheaReceiver[operation]('receiver_close', receiverCloseHandler);
this._rheaReceiver[operation]('receiver_open', receiverOpenHandler);
this._rheaReceiver[operation]('message', receiverMessageHandler);
};
this._fsm = new machina.Fsm({
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_001: [The `ReceiverLink` internal state machine shall be initialized in the `detached` state.]*/
initialState: 'detached',
namespace: 'receiverLink',
states: {
detached: {
_onEnter: (callback, err) => {
this._rheaReceiver = null;
this._rheaReceiverName = null;
if (callback) {
this._safeCallback(callback,err);
} else if (err) {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_011: [If a `receiver_close` or `receiver_error` event is emitted by the `rhea` link object, the `ReceiverLink` object shall forward that error to the client.]*/
this.emit('error', err);
}
},
attach: (callback) => {
this._fsm.transition('attaching', callback);
},
detach: (callback, err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_004: [If the `ReceiverLink` is already in the detached state an invocation of `detach` shall immediately invoke the callback with the (potentially) supplied error parameter.] */
debug(this.toString() + ': while detached - detach for receiver link ' + this._linkAddress);
this._safeCallback(callback, err);
},
forceDetach: () => {
debugErrors(this.toString() + ': while detached - force detach for receiver link ' + this._linkAddress);
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_028: [The `forceDetach` method shall return immediately if the link is already detached.]*/
return;
},
accept: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
reject: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
abandon: (_message, callback) => callback(new errors.DeviceMessageLockLostError())
},
attaching: {
_onEnter: (callback) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_004: [The `attach` method shall use the stored instance of the `rhea` session object to attach a new link object with the combined `linkAddress` and `linkOptions` provided when creating the `ReceiverLink` instance.]*/
this._attachingCallback = callback;
this._indicatedError = undefined;
this._receiverCloseOccurred = false;
this._rheaReceiverName = 'rheaReceiver_' + uuid.v4();
this._combinedOptions.name = this._rheaReceiverName;
debug(this.toString() + ': attaching receiver name: ' + this._rheaReceiverName + ' with address: ' + this._linkAddress);
//
// According to the rhea maintainers, one can depend on that fact that no actual network activity
// will occur until the nextTick() after the call to open_receiver. Because of that, one can
// put the event handlers on the rhea link returned from the open_receiver call and be assured
// that the listeners are in place BEFORE any possible events will be emitted on the link.
//
this._rheaReceiver = this._rheaSession.open_receiver(this._combinedOptions);
manageReceiverHandlers('on');
},
receiverOpenEvent: (context: EventContext) => {
debug(this.toString() + ': In receiver attaching state - open event for ' + context.receiver.name);
const callback = this._attachingCallback;
this._attachingCallback = null;
this._fsm.transition('attached', callback);
},
receiverErrorEvent: (context: EventContext) => {
debugErrors(this.toString() + ': In receiver attaching state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
this._indicatedError = context.receiver.error;
//
// We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
// event.
//
},
receiverCloseEvent: (context: EventContext) => {
debug(this.toString() + ': In receiver attaching state - close event for ' + context.receiver.name);
//
// We enabled the event listeners on the onEnter handler. They are to stay alive until we
// are about to transition to the detached state.
// We are about to transition to the detached state, so clean up.
//
manageReceiverHandlers('removeListener');
const error = this._indicatedError;
const callback = this._attachingCallback;
this._indicatedError = undefined;
this._attachingCallback = undefined;
this._receiverCloseOccurred = true;
this._fsm.transition('detached', callback, error);
},
attach: (null),
detach: (callback, err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_002: [If the `detach` method is invoked on the `ReceiverLink` while still attaching, the ReceiverLink shall detach. It will indicate the error to the callback for the `detach` as well as the callback to the `attach`.] */
debug(this.toString() + ': Detaching while attaching of rhea receiver link ' + this._linkAddress);
manageReceiverHandlers('removeListener');
//
// We may have a callback outstanding from the request that started the attaching.
// We will signal to that callback that an error occurred. We will also invoke the callback supplied
// for this detach.
//
const error = err || this._indicatedError || new Error('Unexpected link detach while attaching');
const attachingCallback = this._attachingCallback;
this._indicatedError = undefined;
this._attachingCallback = undefined;
if (attachingCallback) {
attachingCallback(error);
}
this._fsm.transition('detached', callback, error);
},
forceDetach: (err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_003: [If the `forceDetach` method is invoked on the `ReceiverLink` while still attaching, the ReceiverLink shall detach. With the error supplied to the forceDetach, the `attach` callback will also be invoked. If the error is NOT falsy it will also be emitted as the argument to the `error` event.] */
debug(this.toString() + ': Force detaching while attaching of rhea receiver link ' + this._linkAddress);
manageReceiverHandlers('removeListener');
const error = err || this._indicatedError;
const attachingCallback = this._attachingCallback;
this._indicatedError = undefined;
this._attachingCallback = undefined;
attachingCallback(error);
this._fsm.transition('detached', undefined, error);
},
accept: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
reject: (_message, callback) => callback(new errors.DeviceMessageLockLostError()),
abandon: (_message, callback) => callback(new errors.DeviceMessageLockLostError())
},
attached: {
_onEnter: (callback, err) => {
if (callback) callback(err);
},
receiverErrorEvent: (context: EventContext) => {
debugErrors(this.toString() + ': In receiver attached state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
this._indicatedError = context.receiver.error;
//
// We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
// event.
//
},
receiverCloseEvent: (context: EventContext) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_009: [If a `receiver_close` event received with no preceding error, the link shall be closed with no error.] */
//
// We have a close (which is how the amqp detach performative is surfaced). It could be because of an error that was
// already indicated by the receiver_error event. Or it could simply be that for some reason (disconnect tests?)
// the service side had decided to shut down the link.
//
const error = this._indicatedError; // This might be undefined.
this._indicatedError = undefined;
this._receiverCloseOccurred = true;
debugErrors(this.toString() + ': In receiver attached state - close event for ' + context.receiver.name + ' already indicated error is: ' + getErrorName(error));
if (error) {
context.container.emit('azure-iot-amqp-base:error-indicated', error);
} else {
this._fsm.transition('detaching');
}
},
attach: (callback) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_001: [The `attach` method shall immediately invoke the `callback` if already in an attached state.] */
this._safeCallback(callback);
},
detach: (callback, err) => {
debug(this.toString() + ': While attached - detach for receiver link ' + this._linkAddress + ' callback: ' + callback + ' error: ' + getErrorName(err));
this._fsm.transition('detaching', callback, err);
},
forceDetach: (err) => {
debugErrors(this.toString() + ': While attached - force detach for receiver link ' + this._linkAddress);
manageReceiverHandlers('removeListener');
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_027: [** The `forceDetach` method shall call the `remove` method on the underlying `rhea` link object.]*/
this._rheaReceiver.remove();
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_008: [The `forceDetach` method shall cause an `error` event to be emitted on the `ReceiverLink` if an error is supplied.] */
this._fsm.transition('detached', undefined, err);
},
accept: (message, callback) => {
const delivery = this._findDeliveryRecord(message);
if (delivery) {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_022: [** The `accept` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageCompleted` object if a callback is specified.]*/
delivery.accept();
this._safeCallback(callback, null, new results.MessageCompleted());
} else {
this._safeCallback(callback, new errors.DeviceMessageLockLostError());
}
},
reject: (message, callback) => {
const delivery = this._findDeliveryRecord(message);
if (delivery) {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_023: [** The `reject` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageRejected` object if a callback is specified.]*/
delivery.reject();
this._safeCallback(callback, null, new results.MessageRejected());
} else {
this._safeCallback(callback, new errors.DeviceMessageLockLostError());
}
},
abandon: (message, callback) => {
const delivery = this._findDeliveryRecord(message);
if (delivery) {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_024: [** The `abandon` method shall work whether a `callback` is specified or not, and call the callback with a `result.MessageAbandoned` object if a callback is specified.]*/
delivery.release();
this._safeCallback(callback, null, new results.MessageAbandoned());
} else {
this._safeCallback(callback, new errors.DeviceMessageLockLostError());
}
}
},
detaching: {
_onEnter: (callback, err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_025: [The `detach` method shall call the `callback` with an `Error` that caused the detach whether it succeeds or fails to cleanly detach the link.]*/
debug(this.toString() + ': Detaching of rhea receiver link ' + this._linkAddress);
this._detachingCallback = callback;
this._indicatedError = err;
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_16_009: [The `detach` method shall detach the link created by `rhea` object.] */
this._rheaReceiver.close();
if (this._receiverCloseOccurred) {
//
// There will be no response from the peer to our detach (close). Therefore no event handler will be invoked. Simply
// transition to detached now.
//
this._detachingCallback = undefined;
this._indicatedError = undefined;
this._fsm.transition('detached', callback, err);
}
},
receiverErrorEvent: (context: EventContext) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_006: [An error occurring during a detach will be indicated in the error result of the `detach`.] */
debugErrors(this.toString() + ': In receiver detaching state - error event for ' + context.receiver.name + ' error is: ' + getErrorName(context.receiver.error));
this._indicatedError = this._indicatedError || context.receiver.error;
//
// We don't transition at this point in that we are guaranteed that the error will be followed by a receiver_close
// event.
//
},
receiverCloseEvent: (context: EventContext) => {
debugErrors(this.toString() + ': In receiver detaching state - close event for ' + context.receiver.name + ' already indicated error is: ' + getErrorName(this._indicatedError));
const error = this._indicatedError;
const callback = this._detachingCallback;
this._detachingCallback = undefined;
this._indicatedError = undefined;
manageReceiverHandlers('removeListener');
this._fsm.transition('detached', callback, error);
},
detach: (callback, err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_007: [If `detach` invoked while already detaching, it's callback will be invoked with an error. Whatever caused the original detaching will proceed.] */
//
// Note that we are NOT transitioning to the detached state.
// We are going to give the code a chance to complete normally.
// The caller was free to invoke forceDetach. That handler will
// ALWAYS transition to the detached state.
//
debug(this.toString() + ': While detaching - detach for receiver link ' + this._linkAddress);
if (callback) {
err = err || new Error('Detached invoked while detaching.');
this._safeCallback(callback, err);
}
},
forceDetach: (err) => {
/*Codes_SRS_NODE_AMQP_RECEIVER_LINK_06_005: [If `forceDetach` invoked while detaching, the detach will be completed with the error supplied to the `forceDetach` or an error indicating that the `detach` was preempted by the `forceDetach`.] */
debugErrors(this.toString() + ': While detaching - Force detaching for receiver link ' + this._linkAddress);
this._rheaReceiver.remove();
const detachCallback = this._detachingCallback;
const error = err || this._indicatedError || new Error('Detach preempted by force');
this._detachingCallback = undefined;
this._indicatedError = undefined;
manageReceiverHandlers('removeListener');
if (detachCallback) {
detachCallback(error);
}
this._fsm.transition('detached', undefined, err);
},
'*': () => {
this._fsm.deferUntilTransition('detached');
}
}
}
});
this._fsm.on('transition', (transition) => {
debug(this.toString() + ': ' + transition.fromState + ' -> ' + transition.toState + ' (action:' + transition.action + ')');
});
this.on('removeListener', (eventName) => {
// stop listening for AMQP events if our consumers stop listening for our events
if (eventName === 'message' && this.listeners('message').length === 0) {
this._fsm.handle('detach');
}
});
this.on('newListener', (eventName) => {
// lazy-init AMQP event listeners
if (eventName === 'message') {
this._fsm.handle('attach');
}
});
}