async function sendMessagesToResourceQueue()

in source/services/resource-selector/index.ts [294:367]


async function sendMessagesToResourceQueue(resourceQueueUrl: string, resources: ParsedArn[], taskMetadata: OpsConductorTaskMetadata) {
    const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });
    const startTime = moment.utc().format('YYYY-MM-DD HH:mm:ss');
    const parentExecutionId = uuid.v4();

    LOGGER.info(`Number of resources to process: ${resources.length}`);

    await createTaskExecutionRecord(process.env.TaskExecutionsTableName, taskMetadata.taskId, parentExecutionId, resources.length, startTime);

    // Create maps to associate an ARN with an ID to be supplied in the SendMessageBatch entry
    const arnToId = {};
    const idToArn = {};

    for (let i = 0; i < resources.length; i++) {
        const msgID = `msg_id_${i}`;
        arnToId[resources[i].fullArn] = msgID;
        idToArn[msgID] = resources[i].fullArn;
    }

    // Split the resourceARNs array into chunks of 10
    const resourceChunks: ParsedArn[][] = [];

    while (resources.length > 0) {
        resourceChunks.push(resources.splice(0, 10));
    }

    LOGGER.info(`Number of message batches to send to queue: ${resourceChunks.length}`);

    // Map the chunks into SQSSendMessageBatch promises
    const sendMessageBatchRequests = resourceChunks.map(aChunk => {
        const params: AWS.SQS.SendMessageBatchRequest = {
            QueueUrl: resourceQueueUrl,
            Entries: aChunk.map(aResource => {
                const entry: AWS.SQS.SendMessageBatchRequestEntry = {
                    Id: arnToId[aResource.fullArn],
                    MessageBody: JSON.stringify({
                        MessageType: 'Resource',
                        AutomationDocumentName: taskMetadata.automationDocumentName,
                        TaskDescription: taskMetadata.taskDescription,
                        TaskName: taskMetadata.taskName,
                        TargetTag: taskMetadata.targetTag,
                        TaskId: taskMetadata.taskId,
                        ParentExecutionId: parentExecutionId,
                        TaskParameters: taskMetadata.taskParameters,
                        StartTime: startTime,
                        ResourceARN: aResource.fullArn,
                        ResourceRegion: aResource.resourceRegion ? aResource.resourceRegion : aResource.region,
                        ResourceAccount: aResource.resourceAccount ? aResource.resourceAccount : aResource.accountId,
                        ResourceId: aResource.resourceId
                    })
                }
                return entry;
            })
        };

        return sqs.sendMessageBatch(params).promise();
    });

    const sendMessageBatchResults = await Promise.all(sendMessageBatchRequests);

    let numFailedMessages: number = 0;
    sendMessageBatchResults.forEach(batchResult => {
        if (batchResult.Failed) {
            batchResult.Failed.forEach(failedResult => {
                LOGGER.error(`Failed to add ${idToArn[failedResult.Id]} to the Resource Queue. Reason: ${failedResult.Message} (${failedResult.Code})`);
                numFailedMessages++;
            });
        }
    });

    if (numFailedMessages > 0) {
        throw new Error(`Got ${numFailedMessages} error(s) while adding messages to the Resource Queue`);
    }
}