public async enqueue()

in src/queue/handlers/MessagesHandler.ts [178:311]


  public async enqueue(
    queueMessage: Models.QueueMessage,
    options: Models.MessagesEnqueueOptionalParams,
    context: Context
  ): Promise<Models.MessagesEnqueueResponse> {
    const queueCtx = new QueueStorageContext(context);
    const accountName = queueCtx.account!;
    const queueName = queueCtx.queue!;

    if (queueMessage.messageText === undefined) {
      const body = queueCtx.request!.getBody();

      // TODO: deserialize does not support the message text with only empty character.
      // If the text is undefined, try to retrieve it from the XML body here.
      const parsedBody = await parseXMLwithEmpty(body || "");
      for (const text in parsedBody) {
        if (
          Object.hasOwnProperty.bind(parsedBody)(text) &&
          text.toLowerCase() === "messagetext"
        ) {
          queueMessage.messageText = parsedBody[text];
          break;
        }
      }
    }

    if (queueMessage.messageText === undefined) {
      throw StorageErrorFactory.getInvalidXmlDocument(queueCtx.contextID!);
    }

    if (getUTF8ByteSize(queueMessage.messageText) > MESSAGETEXT_LENGTH_MAX) {
      throw StorageErrorFactory.getRequestBodyTooLarge(queueCtx.contextID!, {
        MaxLimit: `${MESSAGETEXT_LENGTH_MAX}`
      });
    }

    const message: MessageModel = {
      accountName,
      queueName,
      messageId: uuid(),
      insertionTime: new Date(context.startTime!),
      expirationTime: new Date(
        context.startTime!.getTime() + DEFAULT_MESSAGETTL * 1000
      ), // Default ttl is 7 days.
      dequeueCount: 0,
      timeNextVisible: new Date(context.startTime!),
      popReceipt: getPopReceipt(context.startTime!),
      persistency: EMPTY_EXTENT_CHUNK // Provide an empty item to initialize the whole object.
    };

    if (options.visibilitytimeout !== undefined) {
      if (
        options.visibilitytimeout < ENQUEUE_VISIBILITYTIMEOUT_MIN ||
        options.visibilitytimeout > ENQUEUE_VISIBILITYTIMEOUT_MAX
      ) {
        throw StorageErrorFactory.getOutOfRangeQueryParameterValue(
          context.contextID,
          {
            QueryParameterName: "visibilitytimeout",
            QueryParameterValue: `${options.visibilitytimeout}`,
            MinimumAllowed: `${ENQUEUE_VISIBILITYTIMEOUT_MIN}`,
            MaximumAllowed: `${ENQUEUE_VISIBILITYTIMEOUT_MAX}`
          }
        );
      }
      message.timeNextVisible.setTime(
        context.startTime!.getTime() + options.visibilitytimeout * 1000
      );
    }

    if (options.messageTimeToLive !== undefined) {
      if (options.messageTimeToLive === -1) {
        message.expirationTime = new Date(NEVER_EXPIRE_DATE);
      } else if (options.messageTimeToLive < MESSAGETTL_MIN) {
        throw StorageErrorFactory.getInvalidQueryParameterValue(
          context.contextID,
          {
            QueryParameterName: "messagettl",
            QueryParameterValue: `${options.messageTimeToLive}`,
            Reason: `Value must be greater than or equal to 1, or -1 to indicate an infinite TTL.`
          }
        );
      } else if (
        options.visibilitytimeout !== undefined &&
        options.visibilitytimeout >= options.messageTimeToLive
      ) {
        throw StorageErrorFactory.getInvalidQueryParameterValue(
          context.contextID,
          {
            QueryParameterName: "visibilitytimeout",
            QueryParameterValue: `${options.visibilitytimeout}`,
            Reason: `messagettl must be greater than visibilitytimeout.`
          }
        );
      } else {
        if (
          new Date(NEVER_EXPIRE_DATE).getTime() - context.startTime!.getTime() <
          options.messageTimeToLive * 1000
        ) {
          message.expirationTime = new Date(NEVER_EXPIRE_DATE);
        } else {
          message.expirationTime.setTime(
            context.startTime!.getTime() + options.messageTimeToLive * 1000
          );
        }
      }
    }

    // Write data to file system after the validation pass.
    const extentChunk = await this.extentStore.appendExtent(
      Buffer.from(queueMessage.messageText),
      context.contextID
    );
    message.persistency = extentChunk;

    await this.metadataStore.insertMessage(message);

    const response: any = [];
    const responseArray = response as Models.EnqueuedMessage[];
    const responseObject = response as Models.MessagesEnqueueHeaders & {
      statusCode: 201;
    };

    const enqueuedMessage: Models.EnqueuedMessage = message;
    responseArray.push(enqueuedMessage);

    responseObject.date = context.startTime!;
    responseObject.requestId = context.contextID;
    responseObject.version = QUEUE_API_VERSION;
    responseObject.statusCode = 201;
    responseObject.clientRequestId = options.requestId;

    return response;
  }