cdk/lib/constructs/pollerLambda.ts (149 lines of code) (raw):

import { GuAlarm } from '@guardian/cdk/lib/constructs/cloudwatch'; import type { GuStack } from '@guardian/cdk/lib/constructs/core'; import { GuLambdaFunction } from '@guardian/cdk/lib/constructs/lambda'; import { aws_sqs, Duration } from 'aws-cdk-lib'; import { ComparisonOperator, Metric, TreatMissingData, } from 'aws-cdk-lib/aws-cloudwatch'; import { LoggingFormat, RecursiveLoop } from 'aws-cdk-lib/aws-lambda'; import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; import type { PollerConfig, PollerId } from '../../../shared/pollers'; import { getPollerSecretName, POLLER_LAMBDA_ENV_VAR_KEYS, pollerIdToLambdaAppName, } from '../../../shared/pollers'; import { LAMBDA_ARCHITECTURE, LAMBDA_RUNTIME } from '../constants'; interface PollerLambdaProps { pollerId: PollerId; pollerConfig: PollerConfig; ingestionLambdaQueue: aws_sqs.Queue; alarmSnsTopicName: string; } export class PollerLambda { constructor( scope: GuStack, { pollerId, ingestionLambdaQueue, pollerConfig, alarmSnsTopicName, }: PollerLambdaProps, ) { const lambdaAppName = pollerIdToLambdaAppName(pollerId); const secret = new Secret(scope, `${pollerId}Secret`, { secretName: getPollerSecretName(scope.stage, pollerId), description: `Secret for the ${pollerId} poller lambda`, }); const timeout = Duration.seconds( pollerConfig.overrideLambdaTimeoutSeconds ?? 60, ); // we use queue here to allow lambda to call itself, but sometimes with a delay const lambdaQueue = new aws_sqs.Queue(scope, `${pollerId}LambdaQueue`, { queueName: `${scope.stack}-${scope.stage}-${lambdaAppName}_queue`, visibilityTimeout: timeout, // must be at least the same as the lambda retentionPeriod: Duration.minutes(5), // TODO dead letter queue // TODO consider setting retry count to zero }); if ( pollerConfig.idealFrequencyInSeconds && pollerConfig.idealFrequencyInSeconds > Duration.minutes(15).toSeconds() ) { throw Error( "SQS' max delay is 15mins, so if you really mean to poll so infrequently (e.g. hourly or daily), then you'll need to use Cloudwatch/EventBridge rules instead of SQS ", ); } // only one lambda should be happening at once, however the lambda queues up its next execution whilst it's still running // so concurrency of 2 should prevent throttling, but also guard against it going haywire const reservedConcurrentExecutions = 2; const functionName = `${scope.stack}-${scope.stage}-${lambdaAppName}`; const lambda = new GuLambdaFunction(scope, `${pollerId}Lambda`, { app: lambdaAppName, // varying app tag for each lambda allows riff-raff to find by tag functionName, runtime: LAMBDA_RUNTIME, architecture: LAMBDA_ARCHITECTURE, recursiveLoop: RecursiveLoop.ALLOW, // this allows the lambda to indirectly call itself via the SQS queue reservedConcurrentExecutions, environment: { [POLLER_LAMBDA_ENV_VAR_KEYS.INGESTION_LAMBDA_QUEUE_URL]: ingestionLambdaQueue.queueUrl, [POLLER_LAMBDA_ENV_VAR_KEYS.OWN_QUEUE_URL]: lambdaQueue.queueUrl, [POLLER_LAMBDA_ENV_VAR_KEYS.SECRET_NAME]: secret.secretName, }, throttlingMonitoring: { snsTopicName: alarmSnsTopicName, alarmDescription: `The ${functionName} is throttling.`, alarmName: `${functionName}_Throttling_Alarm`, okAction: true, toleratedThrottlingCount: 0, }, errorPercentageMonitoring: { snsTopicName: alarmSnsTopicName, alarmDescription: `The ${functionName} is erroring.`, alarmName: `${functionName}_Error_Alarm`, okAction: true, toleratedErrorPercentage: 0, }, memorySize: pollerConfig.overrideLambdaMemoryMB ?? 128, timeout, handler: `index.handlers.${pollerId}`, // see programmatically generated exports in poller-lambdas/src/index.ts fileName: `poller-lambdas.zip`, // shared zip for all the poller-lambdas loggingFormat: LoggingFormat.TEXT, }); secret.grantRead(lambda); secret.grantWrite(lambda); // wire up lambda to process its own queue lambda.addEventSource(new SqsEventSource(lambdaQueue, { batchSize: 1 })); // give permission to lambda to write back to its own queue (to facilitate recursive calling) lambdaQueue.grantSendMessages(lambda); // allow lambda to write to the ingestion-lambdas queue ingestionLambdaQueue.grantSendMessages(lambda); //TODO consider making that queue a destination for the poller-lambda and then the lambda just returns the payload (on success and failure) rather than using SQS SDK within the lambda const { stalledAlarmThreshold, stalledAlarmPeriod } = (() => { if (!pollerConfig.idealFrequencyInSeconds) { // long polling return { stalledAlarmThreshold: 1, stalledAlarmPeriod: Duration.minutes(3), // long polling (where we expect http requests would timeout in much less than 3 mins) }; } // fixed frequency polling const period = Duration.minutes(15); // fixed frequency polling (because 15mins is max delay for SQS) const threshold = Math.floor((period.toSeconds() / pollerConfig.idealFrequencyInSeconds) * 0.8); // * 0.8 to allow for some slack return {stalledAlarmThreshold: threshold, stalledAlarmPeriod: period}; })() // alarm if the lambda is not invoked often enough (i.e. stalled) new GuAlarm(scope, `${pollerId}LambdaStalledAlarm`, { app: lambdaAppName, snsTopicName: alarmSnsTopicName, alarmDescription: `The ${functionName} is potentially stalled (hasn't been invoked as frequently as expected).`, alarmName: `${functionName}_Stalled_Alarm`, okAction: true, threshold: stalledAlarmThreshold, evaluationPeriods: 1, metric: new Metric({ metricName: 'Invocations', namespace: 'AWS/Lambda', period: stalledAlarmPeriod, statistic: 'sum', dimensionsMap: { FunctionName: functionName, }, }), treatMissingData: TreatMissingData.BREACHING, // no invocations means no metric entries, so missing needs to mean we alarm comparisonOperator: ComparisonOperator.LESS_THAN_THRESHOLD, }); // alarm if the lambda is invoked too frequently (indicates it has perhaps gone haywire - which could exhaust rate limits etc.) new GuAlarm(scope, `${pollerId}LambdaHaywireAlarm`, { app: lambdaAppName, snsTopicName: alarmSnsTopicName, alarmDescription: `The ${functionName} has potentially gone haywire (has been invoked more frequently than expected).`, alarmName: `${functionName}_Haywire_Alarm`, okAction: true, threshold: pollerConfig.idealFrequencyInSeconds ? Math.ceil( // fixed frequency polling Duration.minutes(5).toSeconds() / pollerConfig.idealFrequencyInSeconds, ) * 1.5 : 60, // long polling (more than once per second is too much) evaluationPeriods: 1, metric: new Metric({ metricName: 'Invocations', namespace: 'AWS/Lambda', period: pollerConfig.idealFrequencyInSeconds ? Duration.minutes(5) // fixed frequency polling : Duration.minutes(1), // long polling statistic: 'sum', dimensionsMap: { FunctionName: functionName, }, }), treatMissingData: TreatMissingData.NOT_BREACHING, comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD, }); } }