in src/queue/handlers/MessagesHandler.ts [178:311]
public async enqueue(
queueMessage: Models.QueueMessage,
options: Models.MessagesEnqueueOptionalParams,
context: Context
): Promise<Models.MessagesEnqueueResponse> {
const queueCtx = new QueueStorageContext(context);
const accountName = queueCtx.account!;
const queueName = queueCtx.queue!;
if (queueMessage.messageText === undefined) {
const body = queueCtx.request!.getBody();
// TODO: deserialize does not support the message text with only empty character.
// If the text is undefined, try to retrieve it from the XML body here.
const parsedBody = await parseXMLwithEmpty(body || "");
for (const text in parsedBody) {
if (
Object.hasOwnProperty.bind(parsedBody)(text) &&
text.toLowerCase() === "messagetext"
) {
queueMessage.messageText = parsedBody[text];
break;
}
}
}
if (queueMessage.messageText === undefined) {
throw StorageErrorFactory.getInvalidXmlDocument(queueCtx.contextID!);
}
if (getUTF8ByteSize(queueMessage.messageText) > MESSAGETEXT_LENGTH_MAX) {
throw StorageErrorFactory.getRequestBodyTooLarge(queueCtx.contextID!, {
MaxLimit: `${MESSAGETEXT_LENGTH_MAX}`
});
}
const message: MessageModel = {
accountName,
queueName,
messageId: uuid(),
insertionTime: new Date(context.startTime!),
expirationTime: new Date(
context.startTime!.getTime() + DEFAULT_MESSAGETTL * 1000
), // Default ttl is 7 days.
dequeueCount: 0,
timeNextVisible: new Date(context.startTime!),
popReceipt: getPopReceipt(context.startTime!),
persistency: EMPTY_EXTENT_CHUNK // Provide an empty item to initialize the whole object.
};
if (options.visibilitytimeout !== undefined) {
if (
options.visibilitytimeout < ENQUEUE_VISIBILITYTIMEOUT_MIN ||
options.visibilitytimeout > ENQUEUE_VISIBILITYTIMEOUT_MAX
) {
throw StorageErrorFactory.getOutOfRangeQueryParameterValue(
context.contextID,
{
QueryParameterName: "visibilitytimeout",
QueryParameterValue: `${options.visibilitytimeout}`,
MinimumAllowed: `${ENQUEUE_VISIBILITYTIMEOUT_MIN}`,
MaximumAllowed: `${ENQUEUE_VISIBILITYTIMEOUT_MAX}`
}
);
}
message.timeNextVisible.setTime(
context.startTime!.getTime() + options.visibilitytimeout * 1000
);
}
if (options.messageTimeToLive !== undefined) {
if (options.messageTimeToLive === -1) {
message.expirationTime = new Date(NEVER_EXPIRE_DATE);
} else if (options.messageTimeToLive < MESSAGETTL_MIN) {
throw StorageErrorFactory.getInvalidQueryParameterValue(
context.contextID,
{
QueryParameterName: "messagettl",
QueryParameterValue: `${options.messageTimeToLive}`,
Reason: `Value must be greater than or equal to 1, or -1 to indicate an infinite TTL.`
}
);
} else if (
options.visibilitytimeout !== undefined &&
options.visibilitytimeout >= options.messageTimeToLive
) {
throw StorageErrorFactory.getInvalidQueryParameterValue(
context.contextID,
{
QueryParameterName: "visibilitytimeout",
QueryParameterValue: `${options.visibilitytimeout}`,
Reason: `messagettl must be greater than visibilitytimeout.`
}
);
} else {
if (
new Date(NEVER_EXPIRE_DATE).getTime() - context.startTime!.getTime() <
options.messageTimeToLive * 1000
) {
message.expirationTime = new Date(NEVER_EXPIRE_DATE);
} else {
message.expirationTime.setTime(
context.startTime!.getTime() + options.messageTimeToLive * 1000
);
}
}
}
// Write data to file system after the validation pass.
const extentChunk = await this.extentStore.appendExtent(
Buffer.from(queueMessage.messageText),
context.contextID
);
message.persistency = extentChunk;
await this.metadataStore.insertMessage(message);
const response: any = [];
const responseArray = response as Models.EnqueuedMessage[];
const responseObject = response as Models.MessagesEnqueueHeaders & {
statusCode: 201;
};
const enqueuedMessage: Models.EnqueuedMessage = message;
responseArray.push(enqueuedMessage);
responseObject.date = context.startTime!;
responseObject.requestId = context.contextID;
responseObject.version = QUEUE_API_VERSION;
responseObject.statusCode = 201;
responseObject.clientRequestId = options.requestId;
return response;
}