private _receiveMessagesImpl()

in sdk/servicebus/service-bus/src/core/batchingReceiver.ts [384:592]


  private _receiveMessagesImpl(
    receiver: MinimalReceiver,
    args: ReceiveMessageArgs,
    origResolve: (messages: ServiceBusMessageImpl[]) => void,
    origReject: (err: Error | AmqpError) => void,
  ): void {
    const getRemainingWaitTimeInMs = this._getRemainingWaitTimeInMsFn(
      args.maxWaitTimeInMs,
      args.maxTimeAfterFirstMessageInMs,
    );

    const brokeredMessages: ServiceBusMessageImpl[] = [];
    const loggingPrefix = `[${receiver.connection.id}|r:${receiver.name}]`;

    let totalWaitTimer: NodeJS.Timeout | undefined;
    // eslint-disable-next-line prefer-const
    let cleanupBeforeResolveOrReject: () => void;

    const rejectAfterCleanup = (err: Error | AmqpError): void => {
      cleanupBeforeResolveOrReject();
      origReject(err);
    };

    const resolveImmediately = (result: ServiceBusMessageImpl[]): void => {
      cleanupBeforeResolveOrReject();
      origResolve(result);
    };

    const resolveAfterPendingMessageCallbacks = (result: ServiceBusMessageImpl[]): void => {
      // NOTE: through rhea-promise, most of our event handlers are made asynchronous by calling setTimeout(emit).
      // However, a small set (*error and drain) execute immediately. This can lead to a situation where the logical
      // ordering of events is correct but the execution order is incorrect because the events are not all getting
      // put into the task queue the same way.
      // setTimeout() ensures that we resolve _after_ any already-queued onMessage handlers that may
      // be waiting in the task queue.
      setTimeout(() => {
        cleanupBeforeResolveOrReject();
        origResolve(result);
      });
    };

    const onError: OnAmqpEvent = (context: EventContext) => {
      const eventType = context.session?.error != null ? "session_error" : "receiver_error";
      let error = context.session?.error || context.receiver?.error;

      if (error) {
        error = translateServiceBusError(error);
        logger.logError(error, `${loggingPrefix} '${eventType}' event occurred. Received an error`);
      } else {
        error = new ServiceBusError("An error occurred while receiving messages.", "GeneralError");
      }
      rejectAfterCleanup(error);
    };

    this._closeHandler = (error?: AmqpError | Error): void => {
      if (
        // no error, just closing. Go ahead and return what we have.
        error == null ||
        // Return the collected messages if in ReceiveAndDelete mode because otherwise they are lost forever
        (this._receiveMode === "receiveAndDelete" && brokeredMessages.length)
      ) {
        logger.verbose(
          `${loggingPrefix} Closing. Resolving with ${brokeredMessages.length} messages.`,
        );
        return resolveAfterPendingMessageCallbacks(brokeredMessages);
      }

      rejectAfterCleanup(translateServiceBusError(error));
    };

    let abortSignalCleanupFunction: (() => void) | undefined = undefined;

    // Final action to be performed after
    // - maxMessageCount is reached or
    // - maxWaitTime is passed or
    // - newMessageWaitTimeoutInSeconds is passed since the last message was received
    this._finalAction = async (): Promise<void> => {
      if (receiver.drain) {
        // If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
        // arrive before the drain completes.
        logger.verbose(`${loggingPrefix} Already draining.`);
        return;
      }

      const remainingWaitTimeInMs = getRemainingWaitTimeInMs();
      await this.tryDrainReceiver(receiver, loggingPrefix, remainingWaitTimeInMs, args.abortSignal);
      logger.verbose(
        `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`,
      );
      resolveImmediately(brokeredMessages);
    };

    // Action to be performed on the "message" event.
    const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => {
      // TODO: this appears to be aggravating a bug that we need to look into more deeply.
      // The same timeout+drain sequence should work fine for receiveAndDelete but it appears
      // to cause problems.
      if (this._receiveMode === "peekLock") {
        if (brokeredMessages.length === 0) {
          // We'll now remove the old timer (which was the overall `maxWaitTimeMs` timer)
          // and replace it with another timer that is a (probably) much shorter interval.
          //
          // This allows the user to get access to received messages earlier and also gives us
          // a chance to have fewer messages internally that could get lost if the user's
          // app crashes.
          if (totalWaitTimer) clearTimeout(totalWaitTimer);
          const remainingWaitTimeInMs = getRemainingWaitTimeInMs();
          totalWaitTimer = setTimeout(() => {
            logger.verbose(
              `${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.`,
            );
            this._finalAction!();
          }, remainingWaitTimeInMs);
        }
      }

      try {
        const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
        brokeredMessages.push(data);

        // NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
        // extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
        // silently dropped on the floor.
        if (brokeredMessages.length > args.maxMessageCount) {
          logger.warning(
            `More messages arrived than expected: ${args.maxMessageCount} vs ${brokeredMessages.length}`,
          );
        }
      } catch (err: any) {
        const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
        logger.logError(
          err,
          `${loggingPrefix} Received an error while converting AmqpMessage to ServiceBusMessage`,
        );
        rejectAfterCleanup(errObj);
      }
      if (brokeredMessages.length >= args.maxMessageCount) {
        this._finalAction!();
      }
    };

    const onClose: OnAmqpEventAsPromise = async (context: EventContext) => {
      const type = context.session?.error != null ? "session_closed" : "receiver_closed";
      const error = context.session?.error || context.receiver?.error;

      if (error) {
        logger.logError(error, `${loggingPrefix} '${type}' event occurred. The associated error`);
      }
    };

    cleanupBeforeResolveOrReject = (): void => {
      if (receiver != null) {
        receiver.removeListener(ReceiverEvents.receiverError, onError);
        receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
        receiver.session.removeListener(SessionEvents.sessionError, onError);
        receiver.removeListener(ReceiverEvents.receiverClose, onClose);
        receiver.session.removeListener(SessionEvents.sessionClose, onClose);
      }

      if (totalWaitTimer) {
        clearTimeout(totalWaitTimer);
      }

      if (abortSignalCleanupFunction) {
        abortSignalCleanupFunction();
      }
      abortSignalCleanupFunction = undefined;
    };

    abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => {
      if (receiver.drain) {
        // If a drain is already in process and we cancel, the link state may be out of sync
        // with remote. Reset the link so that we will have fresh start.
        receiver.close();
      }
      rejectAfterCleanup(err);
    }, args.abortSignal);

    // By adding credit here, we let the service know that at max we can handle `maxMessageCount`
    // number of messages concurrently. We will return the user an array of messages that can
    // be of size upto maxMessageCount. Then the user needs to accordingly dispose
    // (complete/abandon/defer/deadletter) the messages from the array.
    const creditToAdd = args.maxMessageCount - receiver.credit;
    logger.verbose(
      `${loggingPrefix} Ensure enough credit for receiving ${args.maxMessageCount} messages. Current: ${receiver.credit}.  To add: ${creditToAdd}.`,
    );

    if (creditToAdd > 0) {
      receiver.addCredit(creditToAdd);
    }

    logger.verbose(
      `${loggingPrefix} Setting the wait timer for ${args.maxWaitTimeInMs} milliseconds.`,
    );

    totalWaitTimer = setTimeout(() => {
      logger.verbose(
        `${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.`,
      );
      this._finalAction!();
    }, args.maxWaitTimeInMs);

    receiver.on(ReceiverEvents.message, onReceiveMessage);
    receiver.on(ReceiverEvents.receiverError, onError);
    receiver.on(ReceiverEvents.receiverClose, onClose);

    receiver.session.on(SessionEvents.sessionError, onError);
    receiver.session.on(SessionEvents.sessionClose, onClose);
  }