in sdk/servicebus/service-bus/src/core/batchingReceiver.ts [384:592]
private _receiveMessagesImpl(
receiver: MinimalReceiver,
args: ReceiveMessageArgs,
origResolve: (messages: ServiceBusMessageImpl[]) => void,
origReject: (err: Error | AmqpError) => void,
): void {
const getRemainingWaitTimeInMs = this._getRemainingWaitTimeInMsFn(
args.maxWaitTimeInMs,
args.maxTimeAfterFirstMessageInMs,
);
const brokeredMessages: ServiceBusMessageImpl[] = [];
const loggingPrefix = `[${receiver.connection.id}|r:${receiver.name}]`;
let totalWaitTimer: NodeJS.Timeout | undefined;
// eslint-disable-next-line prefer-const
let cleanupBeforeResolveOrReject: () => void;
const rejectAfterCleanup = (err: Error | AmqpError): void => {
cleanupBeforeResolveOrReject();
origReject(err);
};
const resolveImmediately = (result: ServiceBusMessageImpl[]): void => {
cleanupBeforeResolveOrReject();
origResolve(result);
};
const resolveAfterPendingMessageCallbacks = (result: ServiceBusMessageImpl[]): void => {
// NOTE: through rhea-promise, most of our event handlers are made asynchronous by calling setTimeout(emit).
// However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical
// ordering of events is correct but the execution order is incorrect because the events are not all getting
// put into the task queue the same way.
// setTimeout() ensures that we resolve _after_ any already-queued onMessage handlers that may
// be waiting in the task queue.
setTimeout(() => {
cleanupBeforeResolveOrReject();
origResolve(result);
});
};
const onError: OnAmqpEvent = (context: EventContext) => {
const eventType = context.session?.error != null ? "session_error" : "receiver_error";
let error = context.session?.error || context.receiver?.error;
if (error) {
error = translateServiceBusError(error);
logger.logError(error, `${loggingPrefix} '${eventType}' event occurred. Received an error`);
} else {
error = new ServiceBusError("An error occurred while receiving messages.", "GeneralError");
}
rejectAfterCleanup(error);
};
this._closeHandler = (error?: AmqpError | Error): void => {
if (
// no error, just closing. Go ahead and return what we have.
error == null ||
// Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever
(this._receiveMode === "receiveAndDelete" && brokeredMessages.length)
) {
logger.verbose(
`${loggingPrefix} Closing. Resolving with ${brokeredMessages.length} messages.`,
);
return resolveAfterPendingMessageCallbacks(brokeredMessages);
}
rejectAfterCleanup(translateServiceBusError(error));
};
let abortSignalCleanupFunction: (() => void) | undefined = undefined;
// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
this._finalAction = async (): Promise<void> => {
if (receiver.drain) {
// If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
// arrive before the drain completes.
logger.verbose(`${loggingPrefix} Already draining.`);
return;
}
const remainingWaitTimeInMs = getRemainingWaitTimeInMs();
await this.tryDrainReceiver(receiver, loggingPrefix, remainingWaitTimeInMs, args.abortSignal);
logger.verbose(
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`,
);
resolveImmediately(brokeredMessages);
};
// Action to be performed on the "message" event.
const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => {
// TODO: this appears to be aggravating a bug that we need to look into more deeply.
// The same timeout+drain sequence should work fine for receiveAndDelete but it appears
// to cause problems.
if (this._receiveMode === "peekLock") {
if (brokeredMessages.length === 0) {
// We'll now remove the old timer (which was the overall `maxWaitTimeMs` timer)
// and replace it with another timer that is a (probably) much shorter interval.
//
// This allows the user to get access to received messages earlier and also gives us
// a chance to have fewer messages internally that could get lost if the user's
// app crashes.
if (totalWaitTimer) clearTimeout(totalWaitTimer);
const remainingWaitTimeInMs = getRemainingWaitTimeInMs();
totalWaitTimer = setTimeout(() => {
logger.verbose(
`${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.`,
);
this._finalAction!();
}, remainingWaitTimeInMs);
}
}
try {
const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
brokeredMessages.push(data);
// NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
// extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
// silently dropped on the floor.
if (brokeredMessages.length > args.maxMessageCount) {
logger.warning(
`More messages arrived than expected: ${args.maxMessageCount} vs ${brokeredMessages.length}`,
);
}
} catch (err: any) {
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
logger.logError(
err,
`${loggingPrefix} Received an error while converting AmqpMessage to ServiceBusMessage`,
);
rejectAfterCleanup(errObj);
}
if (brokeredMessages.length >= args.maxMessageCount) {
this._finalAction!();
}
};
const onClose: OnAmqpEventAsPromise = async (context: EventContext) => {
const type = context.session?.error != null ? "session_closed" : "receiver_closed";
const error = context.session?.error || context.receiver?.error;
if (error) {
logger.logError(error, `${loggingPrefix} '${type}' event occurred. The associated error`);
}
};
cleanupBeforeResolveOrReject = (): void => {
if (receiver != null) {
receiver.removeListener(ReceiverEvents.receiverError, onError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.session.removeListener(SessionEvents.sessionError, onError);
receiver.removeListener(ReceiverEvents.receiverClose, onClose);
receiver.session.removeListener(SessionEvents.sessionClose, onClose);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
if (abortSignalCleanupFunction) {
abortSignalCleanupFunction();
}
abortSignalCleanupFunction = undefined;
};
abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => {
if (receiver.drain) {
// If a drain is already in process and we cancel, the link state may be out of sync
// with remote. Reset the link so that we will have fresh start.
receiver.close();
}
rejectAfterCleanup(err);
}, args.abortSignal);
// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete/abandon/defer/deadletter) the messages from the array.
const creditToAdd = args.maxMessageCount - receiver.credit;
logger.verbose(
`${loggingPrefix} Ensure enough credit for receiving ${args.maxMessageCount} messages. Current: ${receiver.credit}. To add: ${creditToAdd}.`,
);
if (creditToAdd > 0) {
receiver.addCredit(creditToAdd);
}
logger.verbose(
`${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.`,
);
totalWaitTimer = setTimeout(() => {
logger.verbose(
`${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.`,
);
this._finalAction!();
}, args.maxWaitTimeInMs);
receiver.on(ReceiverEvents.message, onReceiveMessage);
receiver.on(ReceiverEvents.receiverError, onError);
receiver.on(ReceiverEvents.receiverClose, onClose);
receiver.session.on(SessionEvents.sessionError, onError);
receiver.session.on(SessionEvents.sessionClose, onClose);
}