cf-custom-resources/lib/backlog-per-task-calculator.js (99 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 "use strict"; const { ECS,DescribeServicesCommand } = require("@aws-sdk/client-ecs"); const { SQS,GetQueueUrlCommand, GetQueueAttributesCommand} = require("@aws-sdk/client-sqs"); // AWS Clients that are overriden in tests. let ecs, sqs; /** * This lambda function calculates the backlog of SQS messages per running ECS tasks, * and writes the metric to CloudWatch. */ exports.handler = async (event, context) => { setupClients(); try { const runningCount = await getRunningTaskCount(process.env.CLUSTER_NAME, process.env.SERVICE_NAME); const backlogs = await Promise.all( convertQueueNames(process.env.QUEUE_NAMES).map(async (queueName) => { const queueUrl = await getQueueURL(queueName); return { queueName: queueName, backlogPerTask: await getBacklogPerTask(queueUrl, runningCount), }; }) ); const timestamp = Date.now(); for (const {queueName, backlogPerTask} of backlogs) { emitBacklogPerTaskMetric(process.env.NAMESPACE, timestamp, queueName, backlogPerTask); } } catch(err) { // If there is any issue we won't log a metric. // This is okay because autoscaling will maintain the current number of running tasks if a data point is missing. // See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-autoscaling-targettracking.html#targettracking-considerations console.error(`Unexpected error ${err}`); } }; /** * Returns the backlog per task. The backlog per task is calculated by dividing the number of messages in the queue with * the number of running tasks. * If there are no running task, we return the total number of messages in the queue so that we can start scaling up. * @param queueUrl The url of the queue. * @param runningTaskCount The number of running tasks part of the ECS service. * @return int The expected number of messages each running task will consume. */ const getBacklogPerTask = async (queueUrl, runningTaskCount) => { const adjustedRunningTasks = runningTaskCount === 0 ? 1 : runningTaskCount; const totalNumberOfMessages = await getQueueDepth(queueUrl); return Math.ceil(totalNumberOfMessages/adjustedRunningTasks); } /** * Writes the backlogPerTask metric for the given queue to stdout following the CloudWatch embedded metric format. * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation.html * @param namespace The namespace for the metric. * @param timestamp The number of milliseconds after Jan 1, 1970 00:00:00 UTC used to emit the metric. * @param queueName The name of the queue. * @param backlogPerTask The number of messages in the queue divided by the number of running tasks. */ const emitBacklogPerTaskMetric = (namespace, timestamp, queueName, backlogPerTask) => { console.log(JSON.stringify({ "_aws": { "Timestamp": timestamp, "CloudWatchMetrics": [{ "Namespace": namespace, "Dimensions": [["QueueName"]], "Metrics": [{"Name":"BacklogPerTask", "Unit": "Count"}] }], }, "QueueName": queueName, "BacklogPerTask": backlogPerTask, })); } /** * Returns the URL for the SQS queue. * @param queueName The name of the queue. * @returns string The URL of the queue. */ const getQueueURL = async (queueName) => { const out = await sqs.send(new GetQueueUrlCommand({ QueueName: queueName, })); return out.QueueUrl; } /** * Returns the total number of messages in the SQS queue. * @param queueUrl The URL of the SQS queue. * @return int The ApproximateNumberOfMessages in the queue. */ const getQueueDepth = async (queueUrl) => { const out = await sqs.send(new GetQueueAttributesCommand({ QueueUrl: queueUrl, AttributeNames: ['ApproximateNumberOfMessages'], })); return out.Attributes.ApproximateNumberOfMessages; } /** * Returns the number of running tasks part of the service. * @param clusterId The short name or full Amazon Resource Name (ARN) of the cluster. * @param serviceName The service name or full Amazon Resource Name (ARN) of the service. * @returns int The number of tasks running part of the service. */ const getRunningTaskCount = async (clusterId, serviceName) => { const out = await ecs.send(new DescribeServicesCommand({ cluster: clusterId, services: [serviceName], })); if (out.services.length === 0) { throw new Error(`service ${serviceName} of cluster ${clusterId} does not exist`); } return out.services[0].runningCount; } /** * Create new clients. */ const setupClients = () => { ecs = new ECS(); sqs = new SQS(); } // convertQueueNames takes a comma separated string of SQS queue names and returns it as an array of strings. const convertQueueNames = (stringToSplit) => { return stringToSplit.split(',') }