in source/services/resource-selector/index.ts [294:367]
async function sendMessagesToResourceQueue(resourceQueueUrl: string, resources: ParsedArn[], taskMetadata: OpsConductorTaskMetadata) {
const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });
const startTime = moment.utc().format('YYYY-MM-DD HH:mm:ss');
const parentExecutionId = uuid.v4();
LOGGER.info(`Number of resources to process: ${resources.length}`);
await createTaskExecutionRecord(process.env.TaskExecutionsTableName, taskMetadata.taskId, parentExecutionId, resources.length, startTime);
// Create maps to associate an ARN with an ID to be supplied in the SendMessageBatch entry
const arnToId = {};
const idToArn = {};
for (let i = 0; i < resources.length; i++) {
const msgID = `msg_id_${i}`;
arnToId[resources[i].fullArn] = msgID;
idToArn[msgID] = resources[i].fullArn;
}
// Split the resourceARNs array into chunks of 10
const resourceChunks: ParsedArn[][] = [];
while (resources.length > 0) {
resourceChunks.push(resources.splice(0, 10));
}
LOGGER.info(`Number of message batches to send to queue: ${resourceChunks.length}`);
// Map the chunks into SQSSendMessageBatch promises
const sendMessageBatchRequests = resourceChunks.map(aChunk => {
const params: AWS.SQS.SendMessageBatchRequest = {
QueueUrl: resourceQueueUrl,
Entries: aChunk.map(aResource => {
const entry: AWS.SQS.SendMessageBatchRequestEntry = {
Id: arnToId[aResource.fullArn],
MessageBody: JSON.stringify({
MessageType: 'Resource',
AutomationDocumentName: taskMetadata.automationDocumentName,
TaskDescription: taskMetadata.taskDescription,
TaskName: taskMetadata.taskName,
TargetTag: taskMetadata.targetTag,
TaskId: taskMetadata.taskId,
ParentExecutionId: parentExecutionId,
TaskParameters: taskMetadata.taskParameters,
StartTime: startTime,
ResourceARN: aResource.fullArn,
ResourceRegion: aResource.resourceRegion ? aResource.resourceRegion : aResource.region,
ResourceAccount: aResource.resourceAccount ? aResource.resourceAccount : aResource.accountId,
ResourceId: aResource.resourceId
})
}
return entry;
})
};
return sqs.sendMessageBatch(params).promise();
});
const sendMessageBatchResults = await Promise.all(sendMessageBatchRequests);
let numFailedMessages: number = 0;
sendMessageBatchResults.forEach(batchResult => {
if (batchResult.Failed) {
batchResult.Failed.forEach(failedResult => {
LOGGER.error(`Failed to add ${idToArn[failedResult.Id]} to the Resource Queue. Reason: ${failedResult.Message} (${failedResult.Code})`);
numFailedMessages++;
});
}
});
if (numFailedMessages > 0) {
throw new Error(`Got ${numFailedMessages} error(s) while adding messages to the Resource Queue`);
}
}