cdk/lib/newswires.ts (240 lines of code) (raw):

import type { Alarms } from '@guardian/cdk'; import { GuPlayApp, GuScheduledLambda } from '@guardian/cdk'; import { AccessScope } from '@guardian/cdk/lib/constants'; import type { NoMonitoring } from '@guardian/cdk/lib/constructs/cloudwatch'; import { GuParameter, GuStack } from '@guardian/cdk/lib/constructs/core'; import type { GuStackProps } from '@guardian/cdk/lib/constructs/core'; import { GuCname } from '@guardian/cdk/lib/constructs/dns'; import { GuVpc, SubnetType } from '@guardian/cdk/lib/constructs/ec2'; import { GuGetS3ObjectsPolicy } from '@guardian/cdk/lib/constructs/iam'; import { GuLambdaFunction } from '@guardian/cdk/lib/constructs/lambda'; import { GuS3Bucket } from '@guardian/cdk/lib/constructs/s3'; import type { App } from 'aws-cdk-lib'; import { aws_logs, Duration } from 'aws-cdk-lib'; import { InstanceClass, InstanceSize, InstanceType, Port, } from 'aws-cdk-lib/aws-ec2'; import { Schedule } from 'aws-cdk-lib/aws-events'; import { LoggingFormat } from 'aws-cdk-lib/aws-lambda'; import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; import { LogGroup, MetricFilter } from 'aws-cdk-lib/aws-logs'; import { DatabaseInstanceEngine, PostgresEngineVersion, StorageType, } from 'aws-cdk-lib/aws-rds'; import { Topic } from 'aws-cdk-lib/aws-sns'; import type { Queue } from 'aws-cdk-lib/aws-sqs'; import { SUCCESSFUL_INGESTION_EVENT_TYPE } from '../../shared/constants'; import type { PollerId } from '../../shared/pollers'; import { POLLERS_CONFIG } from '../../shared/pollers'; import { LAMBDA_ARCHITECTURE, LAMBDA_RUNTIME } from './constants'; import { GuDatabase } from './constructs/database'; import { PollerLambda } from './constructs/pollerLambda'; export type NewswiresProps = GuStackProps & { sourceQueue: Queue; fingerpostQueue: Queue; domainName: string; enableMonitoring: boolean; }; const app = 'newswires'; export class Newswires extends GuStack { constructor(scope: App, id: string, props: NewswiresProps) { super(scope, id, { ...props, app }); const { domainName, enableMonitoring } = props; const vpc = GuVpc.fromIdParameter(this, 'VPC'); const privateSubnets = GuVpc.subnetsFromParameter(this, { type: SubnetType.PRIVATE, app, }); const stageStackApp = `${this.stage}/${this.stack}/${app}`; const databaseName = 'newswires'; const database = new GuDatabase(this, 'NewswiresDB', { app, instanceType: InstanceType.of(InstanceClass.T4G, InstanceSize.SMALL), allowExternalConnection: true, databaseName, multiAz: false, devxBackups: true, vpcSubnets: { subnets: privateSubnets, }, engine: DatabaseInstanceEngine.postgres({ version: PostgresEngineVersion.of('16.4', '16'), // version: PostgresEngineVersion.VER_16, // FIXME temporary, until VER_16 defaults to 16.4 }), storageType: StorageType.GP3, iops: 3000, // the default for gp3 - not required but nice to declare storageThroughput: 125, // the default for gp3 autoMinorVersionUpgrade: true, }); const feedsBucket = new GuS3Bucket(this, `feeds-bucket-${this.stage}`, { app, versioned: true, }); const ingestionLambda = new GuLambdaFunction( this, `IngestionLambda-${this.stage}`, { app: 'ingestion-lambda', runtime: LAMBDA_RUNTIME, architecture: LAMBDA_ARCHITECTURE, handler: 'handler.main', // each execution can handle up to 10 messages, so this is up // to 100 messages in flight; should be more than powerful enough // in high news events reservedConcurrentExecutions: 10, fileName: 'ingestion-lambda.zip', environment: { FEEDS_BUCKET_NAME: feedsBucket.bucketName, DATABASE_ENDPOINT_ADDRESS: database.dbInstanceEndpointAddress, DATABASE_PORT: database.dbInstanceEndpointPort, DATABASE_NAME: databaseName, }, vpc, vpcSubnets: { subnets: privateSubnets, }, loggingFormat: LoggingFormat.TEXT, }, ); ingestionLambda.connections.allowTo(database, Port.tcp(5432)); const eventSourceProps = { /** * This is required to allow us to deal with failures of particular * records handed to a lambda by an SQS queue. Without this, the * lambda will retry the entire batch of records, which is not what * we want. * * See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting * */ reportBatchItemFailures: true, }; ingestionLambda.addEventSource( new SqsEventSource(props.sourceQueue, eventSourceProps), ); ingestionLambda.addEventSource( new SqsEventSource(props.fingerpostQueue, eventSourceProps), ); feedsBucket.grantWrite(ingestionLambda); database.grantConnect(ingestionLambda); new MetricFilter(this, 'IngestionSourceFeeds', { logGroup: LogGroup.fromLogGroupName( this, 'IngestionLogGroup', `/aws/lambda/${ingestionLambda.functionName}`, ), metricNamespace: `${stageStackApp}-ingestion-lambda`, metricName: 'IngestionSourceFeeds', metricValue: '1', filterPattern: aws_logs.FilterPattern.stringValue( '$.eventType', '=', SUCCESSFUL_INGESTION_EVENT_TYPE, ), dimensions: { supplier: '$.supplier' }, }); const scheduledCleanupLambda = new GuScheduledLambda( this, `ScheduledCleanupLambda-${this.stage}`, { app: 'cleanup-lambda', runtime: LAMBDA_RUNTIME, architecture: LAMBDA_ARCHITECTURE, handler: 'handler.main', fileName: 'cleanup-lambda.zip', timeout: Duration.millis(45000), environment: { DATABASE_ENDPOINT_ADDRESS: database.dbInstanceEndpointAddress, DATABASE_PORT: database.dbInstanceEndpointPort, DATABASE_NAME: databaseName, }, vpc, vpcSubnets: { subnets: privateSubnets, }, rules: [ { schedule: Schedule.cron({ hour: '5', minute: '00', weekDay: '*' }), // Every day at 5am }, ], monitoringConfiguration: { noMonitoring: true, }, }, ); scheduledCleanupLambda.connections.allowTo(database, Port.tcp(5432)); database.grantConnect(scheduledCleanupLambda); const panDomainSettingsBucket = new GuParameter( this, 'PanDomainSettingsBucket', { description: 'Bucket name for pan-domain auth settings', fromSSM: true, default: `/${stageStackApp}/pan-domain-settings-bucket`, type: 'String', }, ); const permissionsBucketName = new GuParameter(this, 'PermissionsBucket', { description: 'Bucket name for permissions data', fromSSM: true, default: `/${stageStackApp}/permissions-bucket`, type: 'String', }); const alarmSnsTopic = new Topic(this, `${app}-email-alarm-topic`); const scaling = { minimumInstances: 1, maximumInstances: 2, }; const monitoringConfiguration: Alarms | NoMonitoring = enableMonitoring ? { snsTopicName: alarmSnsTopic.topicName, unhealthyInstancesAlarm: true, http5xxAlarm: { tolerated5xxPercentage: 10, numberOfMinutesAboveThresholdBeforeAlarm: 1, }, } : { noMonitoring: true }; Object.entries(POLLERS_CONFIG).map( ([pollerId, pollerConfig]) => new PollerLambda(this, { pollerId: pollerId as PollerId, pollerConfig, ingestionLambdaQueue: props.sourceQueue, alarmSnsTopicName: alarmSnsTopic.topicName, }), ); const newswiresApp = new GuPlayApp(this, { app, access: { scope: AccessScope.PUBLIC }, privateSubnets, instanceType: InstanceType.of(InstanceClass.T4G, InstanceSize.SMALL), certificateProps: { domainName, }, monitoringConfiguration, userData: { distributable: { fileName: `${app}.deb`, executionStatement: `dpkg -i /${app}/${app}.deb`, }, }, scaling, applicationLogging: { enabled: true, systemdUnitName: app }, imageRecipe: 'editorial-tools-jammy-java17', roleConfiguration: { additionalPolicies: [ new GuGetS3ObjectsPolicy(this, 'PandaAuthPolicy', { bucketName: panDomainSettingsBucket.valueAsString, }), new GuGetS3ObjectsPolicy(this, 'PermissionsCachePolicy', { bucketName: permissionsBucketName.valueAsString, paths: [`${this.stage}/*`], }), ], }, }); // Add the domain name new GuCname(this, 'DnsRecord', { app: app, domainName: domainName, ttl: Duration.minutes(1), resourceRecord: newswiresApp.loadBalancer.loadBalancerDnsName, }); database.grantConnect(newswiresApp.autoScalingGroup); newswiresApp.autoScalingGroup.connections.addSecurityGroup( database.accessSecurityGroup, ); } }