packages/blueprints/gen-ai-chatbot/static-assets/chatbot-genai-cdk/lib/constructs/embedding.ts (273 lines of code) (raw):

import { Construct } from "constructs"; import * as ec2 from "aws-cdk-lib/aws-ec2"; import * as path from "path"; import { CfnOutput, Duration, RemovalPolicy, Stack } from "aws-cdk-lib"; import { DockerImageAsset, Platform } from "aws-cdk-lib/aws-ecr-assets"; import * as iam from "aws-cdk-lib/aws-iam"; import { ITable } from "aws-cdk-lib/aws-dynamodb"; import { CfnPipe } from "aws-cdk-lib/aws-pipes"; import * as ecs from "aws-cdk-lib/aws-ecs"; import * as logs from "aws-cdk-lib/aws-logs"; import { IBucket } from "aws-cdk-lib/aws-s3"; import * as lambda from "aws-cdk-lib/aws-lambda"; import { ISecret } from "aws-cdk-lib/aws-secretsmanager"; import * as cdk from "aws-cdk-lib"; import { DockerImageCode, DockerImageFunction, IFunction, } from "aws-cdk-lib/aws-lambda"; import { DynamoEventSource } from "aws-cdk-lib/aws-lambda-event-sources"; import { SociIndexBuild } from "deploy-time-build"; export interface EmbeddingProps { readonly vpc: ec2.IVpc; readonly database: ITable; readonly dbSecrets: ISecret; readonly bedrockRegion: string; readonly tableAccessRole: iam.IRole; readonly documentBucket: IBucket; readonly embeddingContainerVcpu: number; readonly embeddingContainerMemory: number; } export class Embedding extends Construct { readonly taskSecurityGroup: ec2.ISecurityGroup; readonly container: ecs.ContainerDefinition; readonly removalHandler: IFunction; constructor(scope: Construct, id: string, props: EmbeddingProps) { super(scope, id); /** * ECS */ const cluster = new ecs.Cluster(this, "Cluster", { vpc: props.vpc, containerInsights: true, }); const taskDefinition = new ecs.FargateTaskDefinition( this, "TaskDefinition", { cpu: props.embeddingContainerVcpu, memoryLimitMiB: props.embeddingContainerMemory, runtimePlatform: { cpuArchitecture: ecs.CpuArchitecture.X86_64, operatingSystemFamily: ecs.OperatingSystemFamily.LINUX, }, } ); taskDefinition.addToTaskRolePolicy( new iam.PolicyStatement({ actions: ["bedrock:*"], resources: ["*"], }) ); taskDefinition.addToTaskRolePolicy( new iam.PolicyStatement({ actions: ["sts:AssumeRole"], resources: [props.tableAccessRole.roleArn], }) ); const taskLogGroup = new logs.LogGroup(this, "TaskLogGroup", { removalPolicy: RemovalPolicy.DESTROY, retention: logs.RetentionDays.ONE_WEEK, }); const asset = new DockerImageAsset(this, "Image", { directory: path.join(__dirname, "../../../backend"), file: "embedding.Dockerfile", platform: Platform.LINUX_AMD64, }); SociIndexBuild.fromDockerImageAsset(this, "Index", asset); const container = taskDefinition.addContainer("Container", { image: ecs.AssetImage.fromDockerImageAsset(asset), logging: ecs.LogDriver.awsLogs({ streamPrefix: "embed-task", logGroup: taskLogGroup, }), environment: { BEDROCK_REGION: props.bedrockRegion, DB_SECRETS_ARN: props.dbSecrets.secretArn, ACCOUNT: Stack.of(this).account, REGION: Stack.of(this).region, TABLE_NAME: props.database.tableName, TABLE_ACCESS_ROLE_ARN: props.tableAccessRole.roleArn, DOCUMENT_BUCKET: props.documentBucket.bucketName, }, }); taskLogGroup.grantWrite(container.taskDefinition.executionRole!); props.dbSecrets.grantRead(container.taskDefinition.taskRole); const taskSg = new ec2.SecurityGroup(this, "TaskSecurityGroup", { vpc: props.vpc, allowAllOutbound: true, }); /** * EventBridge Pipes */ const pipeLogGroup = new logs.LogGroup(this, "PipeLogGroup", { removalPolicy: RemovalPolicy.DESTROY, retention: logs.RetentionDays.ONE_WEEK, }); const pipeRole = new iam.Role(this, "PipeRole", { assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"), }); pipeRole.addToPolicy( new iam.PolicyStatement({ actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", ], resources: [props.database.tableStreamArn!], }) ); pipeRole.addToPolicy( new iam.PolicyStatement({ actions: ["ecs:RunTask"], resources: [ taskDefinition.taskDefinitionArn, `${taskDefinition.taskDefinitionArn}:*`, ], }) ); pipeRole.addToPolicy( new iam.PolicyStatement({ actions: ["iam:PassRole"], resources: ["*"], conditions: { StringLike: { "iam:PassedToService": "ecs-tasks.amazonaws.com", }, }, }) ); const pipe = new CfnPipe(this, "Pipe", { source: props.database.tableStreamArn!, sourceParameters: { dynamoDbStreamParameters: { batchSize: 1, startingPosition: "LATEST", maximumRetryAttempts: 1, // Avoid infinite retry which causes stuck }, filterCriteria: { // Trigger when bot is created or updated filters: [ { pattern: '{"dynamodb":{"NewImage":{"SyncStatus":{"S":[{"prefix":"QUEUED"}]}}}}', }, ], }, }, target: cluster.clusterArn, targetParameters: { ecsTaskParameters: { enableEcsManagedTags: false, enableExecuteCommand: false, launchType: "FARGATE", networkConfiguration: { awsvpcConfiguration: { assignPublicIp: "DISABLED", subnets: props.vpc.selectSubnets({ subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS, }).subnetIds, securityGroups: [taskSg.securityGroupId], }, }, taskCount: 1, taskDefinitionArn: taskDefinition.taskDefinitionArn, overrides: { // Pass event as argument. // Ref: https://repost.aws/questions/QU_WC7301mT8qR7ip_9cyjdQ/eventbridge-pipes-and-ecs-task containerOverrides: [ { // Only pass keys and load the object from within the ECS task. // https://github.com/aws-samples/bedrock-claude-chat/issues/190 command: ["-u", "embedding/main.py", "$.dynamodb.Keys"], name: taskDefinition.defaultContainer!.containerName, }, ], }, }, }, logConfiguration: { cloudwatchLogsLogDestination: { logGroupArn: pipeLogGroup.logGroupArn, }, level: "INFO", }, roleArn: pipeRole.roleArn, }); /** * Removal handler */ const removeHandlerRole = new iam.Role(this, "RemovalHandlerRole", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), }); removeHandlerRole.addManagedPolicy( iam.ManagedPolicy.fromAwsManagedPolicyName( "service-role/AWSLambdaVPCAccessExecutionRole" ) ); removeHandlerRole.addToPolicy( new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "cloudformation:DescribeStacks", "cloudformation:DescribeStackEvents", "cloudformation:DescribeStackResource", "cloudformation:DescribeStackResources", "cloudformation:DeleteStack", ], resources: [`*`], }) ); removeHandlerRole.addToPolicy( new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "apigateway:GET", "apigateway:POST", "apigateway:PUT", "apigateway:DELETE", ], resources: [`arn:aws:apigateway:${Stack.of(this).region}::/*`], }) ); props.database.grantStreamRead(removeHandlerRole); props.documentBucket.grantReadWrite(removeHandlerRole); const removalHandler = new DockerImageFunction(this, "BotRemovalHandler", { code: DockerImageCode.fromImageAsset( path.join(__dirname, "../../../backend"), { platform: Platform.LINUX_AMD64, file: "websocket.Dockerfile", cmd: ["app.bot_remove.handler"], } ), vpc: props.vpc, vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS }, timeout: Duration.minutes(1), environment: { DB_SECRETS_ARN: props.dbSecrets.secretArn, DOCUMENT_BUCKET: props.documentBucket.bucketName, }, role: removeHandlerRole, }); props.dbSecrets.grantRead(removalHandler); removalHandler.addEventSource( new DynamoEventSource(props.database, { startingPosition: lambda.StartingPosition.TRIM_HORIZON, batchSize: 1, retryAttempts: 2, filters: [ { pattern: '{"eventName":["REMOVE"]}', }, ], }) ); this.taskSecurityGroup = taskSg; this.container = container; this.removalHandler = removalHandler; new CfnOutput(this, "ClusterName", { value: cluster.clusterName, }); new CfnOutput(this, "TaskDefinitionName", { value: cdk.Fn.select( 1, cdk.Fn.split( "/", cdk.Fn.select(5, cdk.Fn.split(":", taskDefinition.taskDefinitionArn)) ) ), }); new CfnOutput(this, "TaskSecurityGroupId", { value: taskSg.securityGroupId, }); } }