in source/services/queue-consumer/index.ts [88:140]
async function getMaxNumberOfMessagesToReadFromQueue(): Promise<number> {
let data: AWS.SSM.DescribeAutomationExecutionsResult = null;
try {
data = await ssm.describeAutomationExecutions({ Filters: [{ Key: 'ExecutionStatus', 'Values': SSM_ACTIVE_AND_PENDING_STATUSES }], MaxResults: SSM_CONCURRENT_LIMIT }).promise();
} catch (err) {
logger.error('Unable to query SSM for active executions. ssm.describeAutomationExecutions returned an error');
throw (err);
}
if (!data) {
throw (new Error('Unexpected response from ssm.describeAutomationExecutions'));
}
if (!data.AutomationExecutionMetadataList) {
// The API should at least return an empty array
logger.error('Unable to determine number of active SSM executions');
throw (new Error('ssm.describeAutomationExecutions response did not contain AutomationExecutionMetadataList'));
} else if (data.AutomationExecutionMetadataList.length >= (SSM_CONCURRENT_LIMIT - PADDING_AMOUNT_FOR_SSM)) {
logger.info(`SSM is either close to or at its limit of active executions (${SSM_CONCURRENT_LIMIT}). We won't read anything from the queue in this case.`);
return 0;
} else if (data.AutomationExecutionMetadataList.length === 0) {
// Since there are no automations currently running, read the max number of messages off the queue
logger.info('There are no SSM automations running');
return SQS_RECEIVE_MESSAGE_MAX;
} else {
logger.info(`Current active/pending automations: ${data.AutomationExecutionMetadataList.length}`);
const getActiveSteps = data.AutomationExecutionMetadataList.map(a => {
return getNumberOfActiveStepsForAutomationId(a.AutomationExecutionId, SSM_CONCURRENT_LIMIT);
});
const activeSteps = await Promise.all(getActiveSteps);
const totalActiveAutomationSteps = activeSteps.reduce((x, y) => x + y, 0);
logger.info(`Current active/pending automation steps: ${totalActiveAutomationSteps}`);
const maxMessagesToRead = SSM_CONCURRENT_LIMIT - PADDING_AMOUNT_FOR_SSM - totalActiveAutomationSteps;
if (maxMessagesToRead < 0) {
// Negative number here means we don't have bandwidth in SSM to add more automations.
// Return 0 so we don't bother reading from the queue
return 0;
} else if (maxMessagesToRead > SQS_RECEIVE_MESSAGE_MAX) {
// We have extra bandwidth in SSM but there's a limit in SQS. Return the SQS limit
return SQS_RECEIVE_MESSAGE_MAX;
} else {
// Return the amount of executions we can add to SSM before they start being queued on the SSM side
return maxMessagesToRead;
}
}
}