notificationworkerlambda/cdk/lib/senderworker.ts (263 lines of code) (raw):

import * as cdk from 'aws-cdk-lib' import * as ecr from 'aws-cdk-lib/aws-ecr' import * as sqs from 'aws-cdk-lib/aws-sqs' import * as sns from 'aws-cdk-lib/aws-sns' import * as iam from 'aws-cdk-lib/aws-iam' import * as lambda from 'aws-cdk-lib/aws-lambda' import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch' import * as ssm from 'aws-cdk-lib/aws-ssm' import * as cdkcore from 'constructs' import {SnsAction} from 'aws-cdk-lib/aws-cloudwatch-actions' import { GuStack } from "@guardian/cdk/lib/constructs/core" import { Duration, type App } from "aws-cdk-lib" import type { GuStackProps } from "@guardian/cdk/lib/constructs/core" import { Metric } from 'aws-cdk-lib/aws-cloudwatch' interface SenderWorkerOpts { handler: string, imageRepo: ecr.IRepository, buildId: string, reservedConcurrency: number, alarmTopic: sns.ITopic, tooFewInvocationsAlarmPeriod: cdk.Duration, tooFewInvocationsEnabled: boolean, cleanerQueueArn: string, platform: string, paramPrefix: string, isBatchingSqsMessages: boolean, dailyAlarmPeriod: boolean, tooFewNotificationByTypeAlarms: boolean, } class SenderWorker extends cdkcore.Construct { readonly senderSqs: sqs.Queue constructor(scope: GuStack, id: string, props: SenderWorkerOpts) { super(scope, id) cdk.Tags.of(this).add("App", id) const snsTopicAction = new SnsAction(props.alarmTopic) const senderDlq = new sqs.Queue(this, 'SenderDlq') this.senderSqs = new sqs.Queue(this, 'SenderSqs', { visibilityTimeout: props.isBatchingSqsMessages ? cdk.Duration.seconds(190) : cdk.Duration.seconds(100), retentionPeriod: cdk.Duration.hours(1), deadLetterQueue: { queue: senderDlq, maxReceiveCount: 5 } }) const executionRole = new iam.Role(this, 'ExecutionRole', { assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), path: "/", inlinePolicies: { logs: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: [ 'logs:CreateLogGroup' ], resources: [ `arn:aws:logs:eu-west-1:${scope.account}:*` ] }), new iam.PolicyStatement({ actions: [ 'logs:CreateLogStream', 'logs:PutLogEvents' ], resources: [ `arn:aws:logs:eu-west-1:${scope.account}:log-group:/aws/lambda/*:*` ] }) ] }), SQSOutput: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: [ 'sqs:*' ], resources: [ this.senderSqs.queueArn ] }) ] }), SQSInput: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: [ 'sqs:SendMessage' ], resources: [ props.cleanerQueueArn ] }) ] }), Conf: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: [ 'ssm:GetParametersByPath' ], resources: [ `arn:aws:ssm:${scope.region}:${scope.account}:parameter/notifications/${scope.stage}/workers/${props.platform}` ] }) ] }), Cloudwatch: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: [ 'cloudwatch:PutMetricData' ], resources: [ '*' ] }) ] }), } }) const codeImage = lambda.DockerImageCode.fromEcr(props.imageRepo, { cmd: [ props.handler ], tag: props.buildId }) const senderLambdaCtr = new lambda.DockerImageFunction(this, 'SenderLambdaCtr', { functionName: `${scope.stack}-${id}-sender-ctr-${scope.stage}`, code: codeImage, environment: { Stage: scope.stage, Stack: scope.stack, App: id, Platform: props.platform, }, memorySize: 10240, description: `sends notifications for ${id}`, role: executionRole, timeout: props.isBatchingSqsMessages ? cdk.Duration.seconds(180) : cdk.Duration.seconds(90), reservedConcurrentExecutions: props.reservedConcurrency }) const senderSqsEventSourceMapping = new lambda.EventSourceMapping(this, "SenderSqsEventSourceMapping", { batchSize: props.isBatchingSqsMessages ? 20 : 1, maxBatchingWindow: props.isBatchingSqsMessages ? cdk.Duration.seconds(1) : cdk.Duration.seconds(0), enabled: true, eventSourceArn: this.senderSqs.queueArn, target: senderLambdaCtr }) senderSqsEventSourceMapping.node.addDependency(this.senderSqs) senderSqsEventSourceMapping.node.addDependency(senderLambdaCtr) const senderThrottleAlarm = new cloudwatch.Alarm(this, 'SenderThrottleAlarm', { alarmDescription: `Triggers if the ${id} sender lambda is throttled in ${scope.stage}.`, comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD, evaluationPeriods: 1, threshold: 0, metric: senderLambdaCtr.metricThrottles({period: cdk.Duration.seconds(360), statistic: "Sum"}), treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING }) senderThrottleAlarm.addAlarmAction(snsTopicAction) senderThrottleAlarm.addOkAction(snsTopicAction) const senderErrorAlarm = new cloudwatch.Alarm(this, 'SenderErrorAlarm', { alarmDescription: `Triggers if the ${id} sender lambda errors in ${scope.stage}.`, comparisonOperator: cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD, evaluationPeriods: 1, threshold: 0, metric: senderLambdaCtr.metricErrors({period: cdk.Duration.seconds(360), statistic: "Sum"}), treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING }) senderErrorAlarm.addAlarmAction(snsTopicAction) senderErrorAlarm.addOkAction(snsTopicAction) const senderTooFewInvocationsAlarm = new cloudwatch.Alarm(this, 'SenderTooFewInvocationsAlarm', { alarmDescription: `Triggers if the ${id} sender lambda is not frequently invoked in ${scope.stage}.`, comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD, evaluationPeriods: 1, threshold: 0, // whole day for editions, 60 minutes for others metric: senderLambdaCtr.metricInvocations({period: cdk.Duration.seconds(props.dailyAlarmPeriod ? 60 * 60 * 24 : 60 * 60), statistic: "Sum"}), treatMissingData: cloudwatch.TreatMissingData.BREACHING, actionsEnabled: true // isEnabled }) senderTooFewInvocationsAlarm.addAlarmAction(snsTopicAction) senderTooFewInvocationsAlarm.addOkAction(snsTopicAction) if (props.tooFewNotificationByTypeAlarms) { const nonBreakingCountMetric = new Metric({ namespace: `Notifications/${scope.stage}/workers`, metricName: "worker.notificationProcessingTime", period: Duration.minutes(15), statistic: "SampleCount", dimensionsMap: { platform: id, type: "other", }, }); const senderTooFewNonBreakingAlarm = new cloudwatch.Alarm(this, 'SenderTooFewNonBreakingAlarm', { alarmDescription: `Triggers if the ${id} sender lambda is not frequently invoked for non-breaking news notification in ${scope.stage}.`, comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD, evaluationPeriods: 4, threshold: 0, metric: nonBreakingCountMetric, treatMissingData: cloudwatch.TreatMissingData.BREACHING, actionsEnabled: (scope.stage === 'PROD'), }) senderTooFewNonBreakingAlarm.addAlarmAction(snsTopicAction) senderTooFewNonBreakingAlarm.addOkAction(snsTopicAction) const breakingNewsCountMetric = new Metric({ namespace: `Notifications/${scope.stage}/workers`, metricName: "worker.notificationProcessingTime", period: Duration.minutes(15), statistic: "SampleCount", dimensionsMap: { platform: id, type: "breakingNews", }, }); const senderTooFewBreakingNewsAlarm = new cloudwatch.Alarm(this, 'SenderTooFewBreakingNewsAlarm', { alarmDescription: `Triggers if the ${id} sender lambda is not frequently invoked for breaking news notification in ${scope.stage}.`, comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD, evaluationPeriods: 24, threshold: 0, metric: breakingNewsCountMetric, treatMissingData: cloudwatch.TreatMissingData.BREACHING, actionsEnabled: (scope.stage === 'PROD'), }) senderTooFewBreakingNewsAlarm.addAlarmAction(snsTopicAction) senderTooFewBreakingNewsAlarm.addOkAction(snsTopicAction) } // this advertises the name of the sender queue to the harvester app new ssm.StringParameter(this, 'SenderQueueSSMParameter', { parameterName: `/notifications/${scope.stage}/workers/harvester/${props.paramPrefix}SqsCdkUrl`, simpleName: false, stringValue: this.senderSqs.queueUrl, tier: ssm.ParameterTier.STANDARD, dataType: ssm.ParameterDataType.TEXT }) } } export class SenderWorkerStack extends GuStack { constructor(scope: App, id: string, props: GuStackProps) { super(scope, id, props) const senderTooFewInvocationsAlarmPeriodParam = new cdk.CfnParameter(this, "SenderTooFewInvocationsAlarmPeriodParam", { type: "Number", description: "How long until no execution is suspicious, in seconds" }) const reservedConcurrencyParam = new cdk.CfnParameter(this, "ReservedConcurrency", { type: "Number", description: "How many concurrent execution to provision the lamdba with" }) const buildIdParam = new cdk.CfnParameter(this, "BuildId", { type: "String", description: "build id from teamcity, the image should be tagged with this" }) const alarmTopicArnParam = new cdk.CfnParameter(this, "AlarmTopicArn", { type: "String", description: "The ARN of the SNS topic to send all the cloudwatch alarms to" }) const cleanerQueueArnParam = new cdk.CfnParameter(this, "CleanerQueueArnParam", { type: "String", description: "The ARN of the cleaner SQS queue" }); const notificationEcrRepo = ecr.Repository.fromRepositoryAttributes(this, "NotificationLambdaRepository", { repositoryArn: cdk.Fn.importValue("NotificationLambdaRepositoryArn"), repositoryName: cdk.Fn.importValue("NotificationLambdaRepositoryName") }) let sharedOpts = { imageRepo: notificationEcrRepo, buildId: buildIdParam.valueAsString, reservedConcurrency: reservedConcurrencyParam.valueAsNumber, alarmTopic: sns.Topic.fromTopicArn(this, 'AlarmTopic', alarmTopicArnParam.valueAsString), tooFewInvocationsAlarmPeriod: cdk.Duration.seconds(senderTooFewInvocationsAlarmPeriodParam.valueAsNumber), tooFewInvocationsEnabled: props.stage === 'PROD', cleanerQueueArn: cleanerQueueArnParam.valueAsString } let workerQueueArns: string[] = [] const addWorker = (workerName: string, paramPrefix: string, handler: string, isBatchingSqsMessages: boolean = false, dailyAlarmPeriod: boolean = false, tooFewNotificationByTypeAlarms: boolean = false) => { let worker = new SenderWorker(this, workerName, { ...props, platform: workerName, paramPrefix: paramPrefix, handler: handler, isBatchingSqsMessages, ...sharedOpts, dailyAlarmPeriod: dailyAlarmPeriod, tooFewNotificationByTypeAlarms: tooFewNotificationByTypeAlarms, }) workerQueueArns.push(worker.senderSqs.queueArn) } /* * add each of the worker lambdas, where each one handles a different * platform or app by talking to a different lambda handler function */ addWorker("ios", "iosLive", "com.gu.notifications.worker.IOSSender::handleChunkTokens", false, false, true) addWorker("android", "androidLive", "com.gu.notifications.worker.AndroidSender::handleChunkTokens", true, false, true) // edition apps only send one notification a day in order to get content for that day addWorker("ios-edition", "iosEdition", "com.gu.notifications.worker.IOSSender::handleChunkTokens", false, true) addWorker("android-edition", "androidEdition", "com.gu.notifications.worker.AndroidSender::handleChunkTokens", false, true) addWorker("android-beta", "androidBeta", "com.gu.notifications.worker.AndroidSender::handleChunkTokens") /* * each worker has been assigned an SQS queue which, when written to, will * trigger it to send its notifications. Here, we export the list of worker * queue ARNs so that it can be used in other stacks, for example, Harvester * needs to give itself permission to write to these queues. */ new cdk.CfnOutput(this, "NotificationSenderWorkerQueueArns", { exportName: "NotificationSenderWorkerQueueArns-" + this.stage, value: cdk.Fn.join(",", workerQueueArns) }) } }