cdk/lib/recipes-reindex.ts (212 lines of code) (raw):

import type { GuStack } from '@guardian/cdk/lib/constructs/core'; import { GuLambdaFunction } from '@guardian/cdk/lib/constructs/lambda'; import { Duration, RemovalPolicy } from 'aws-cdk-lib'; import type { IEventBus } from 'aws-cdk-lib/aws-events'; import { Effect, PolicyStatement } from 'aws-cdk-lib/aws-iam'; import { Architecture, Runtime } from 'aws-cdk-lib/aws-lambda'; import { Bucket } from 'aws-cdk-lib/aws-s3'; import { Choice, Condition, CustomState, DefinitionBody, Fail, StateMachine, Succeed, TaskInput, Wait, WaitTime, } from 'aws-cdk-lib/aws-stepfunctions'; import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks'; import { Construct } from 'constructs'; import type { DataStore } from './datastore'; type RecipesReindexProps = { dataStore: DataStore; contentUrlBase: string; reindexBatchSize: number; reindexWaitTime: number; eventBus: IEventBus; }; export class RecipesReindex extends Construct { constructor( scope: GuStack, id: string, { dataStore, contentUrlBase, reindexBatchSize, reindexWaitTime, eventBus, }: RecipesReindexProps, ) { super(scope, id); const appBase = 'recipes-reindex'; const lambdaFileName = `${appBase}.zip`; const snapshotBucket = new Bucket(this, 'SnapshotBucket', { enforceSSL: true, removalPolicy: RemovalPolicy.DESTROY, }); const snapshotRecipeIndex = new GuLambdaFunction( scope, 'SnapshotRecipeIndexLambda', { app: `${appBase}-snapshot-recipe-index`, description: 'Store a snapshot of the current recipe index in S3', fileName: lambdaFileName, handler: 'main.snapshotRecipeIndexHandler', functionName: `${appBase}-snapshot-recipe-index-${scope.stage}`, runtime: Runtime.NODEJS_20_X, initialPolicy: [ new PolicyStatement({ effect: Effect.ALLOW, actions: ['s3:PutObject'], resources: [snapshotBucket.bucketArn + '/*'], }), new PolicyStatement({ effect: Effect.ALLOW, actions: ['dynamodb:Scan', 'dynamodb:Query'], resources: [ dataStore.table.tableArn, dataStore.table.tableArn + '/index/*', ], }), ], environment: { CONTENT_URL_BASE: contentUrlBase, RECIPE_INDEX_SNAPSHOT_BUCKET: snapshotBucket.bucketName, INDEX_TABLE: dataStore.table.tableName, LAST_UPDATED_INDEX: dataStore.lastUpdatedIndexName, }, architecture: Architecture.ARM_64, timeout: Duration.seconds(30), memorySize: 128, }, ); const writeBatchToReindexQueue = new GuLambdaFunction( scope, 'WriteBatchToReindexQueueLambda', { app: `${appBase}-write-batch-to-index-queue`, description: 'Write a batch of recipe ids to the reindex queue', fileName: lambdaFileName, handler: 'main.writeBatchToReindexQueueHandler', functionName: `${appBase}-write-batch-to-reindex-queue-${scope.stage}`, runtime: Runtime.NODEJS_20_X, initialPolicy: [ new PolicyStatement({ effect: Effect.ALLOW, actions: ['s3:GetObject'], resources: [snapshotBucket.bucketArn + '/*'], }), new PolicyStatement({ effect: Effect.ALLOW, actions: ['events:PutEvents'], resources: [eventBus.eventBusArn], }), new PolicyStatement({ effect: Effect.ALLOW, resources: ['*'], actions: ['cloudwatch:PutMetricData'], }), ], environment: { RECIPE_INDEX_SNAPSHOT_BUCKET: snapshotBucket.bucketName, REINDEX_BATCH_SIZE: reindexBatchSize.toString(), OUTGOING_EVENT_BUS: eventBus.eventBusName, }, architecture: Architecture.ARM_64, timeout: Duration.seconds(30), memorySize: 128, }, ); // Necessary to reference `dryRun` later in the step fn. const storeDryRun = new CustomState(scope, 'storeDryRun', { stateJson: { Type: 'Pass', Assign: { 'dryRun.$': '$.dryRun', }, }, }); const checkForOtherRunningReindexesTask = new CustomState( scope, 'checkForOtherRunningTasks', { stateJson: { Type: 'Task', Parameters: { StatusFilter: 'RUNNING', // The ARN of the running state machine 'StateMachineArn.$': '$$.StateMachine.Id', }, Resource: 'arn:aws:states:::aws-sdk:sfn:listExecutions', }, }, ); const snapshotOrderedIndexTask = new LambdaInvoke( this, 'SnapshotOrderedIndexTask', { lambdaFunction: snapshotRecipeIndex, payload: TaskInput.fromObject({ 'executionId.$': '$$.Execution.Name', }), }, ); const writeBatchToReindexQueueTask = new LambdaInvoke( this, 'WriteBatchToReindexQueueTask', { lambdaFunction: writeBatchToReindexQueue, inputPath: '$.Payload', payload: TaskInput.fromObject({ 'input.$': '$', 'dryRun.$': '$dryRun', }), }, ); const waitForThroughputAndWriteNextBatch = new Wait( scope, 'WaitForThroughPut', { time: WaitTime.duration(Duration.seconds(reindexWaitTime)), }, ).next(writeBatchToReindexQueueTask); const isReindexComplete = new Choice(this, 'IsReindexComplete') .when( Condition.numberGreaterThanJsonPath( '$.Payload.nextIndex', '$.Payload.lastIndex', ), new Succeed(this, 'Reindex complete'), ) .otherwise(waitForThroughputAndWriteNextBatch); snapshotOrderedIndexTask .next(writeBatchToReindexQueueTask) .next(isReindexComplete); const isOnlyRunningReindex = new Choice(this, 'IsOnlyRunningReindex') .when(Condition.isNotPresent('$.Executions[1]'), snapshotOrderedIndexTask) .otherwise( new Fail(this, 'Fail', { error: 'ReindexingError', cause: 'Other reindexes running', }), ); // Define the state machine const definition = storeDryRun .next(checkForOtherRunningReindexesTask) .next(isOnlyRunningReindex); // We define the name manually so we can construct the ARN manually and limit the scope // of the ListExecutions permission to the state machine itself - using `stateMachineArn` // or `stateMachineName` will introduce circular dependencies. const stateMachineName = `${appBase}-${scope.stage}-reindex-recipes`; const stateMachine = new StateMachine(this, 'ReindexingStateMachine', { definitionBody: DefinitionBody.fromChainable(definition), stateMachineName, timeout: Duration.minutes(15), }); stateMachine.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, resources: [ `arn:aws:states:eu-west-1:${scope.account}:stateMachine:${stateMachineName}`, ], actions: ['states:ListExecutions'], }), ); } }