packages/cdk/lib/transcription-service.ts (954 lines of code) (raw):
import { GuApiLambda } from '@guardian/cdk';
import { GuCertificate } from '@guardian/cdk/lib/constructs/acm';
import type { GuStackProps } from '@guardian/cdk/lib/constructs/core';
import {
GuAmiParameter,
GuDistributionBucketParameter,
GuLoggingStreamNameParameter,
GuStack,
GuStringParameter,
} from '@guardian/cdk/lib/constructs/core';
import { GuCname } from '@guardian/cdk/lib/constructs/dns';
import {
GuSecurityGroup,
GuVpc,
SubnetType,
} from '@guardian/cdk/lib/constructs/ec2';
import { GuEcsTask } from '@guardian/cdk/lib/constructs/ecs';
import {
GuAllowPolicy,
GuInstanceRole,
GuPolicy,
} from '@guardian/cdk/lib/constructs/iam';
import { GuLambdaFunction } from '@guardian/cdk/lib/constructs/lambda';
import { GuS3Bucket } from '@guardian/cdk/lib/constructs/s3';
import { MAX_RECEIVE_COUNT } from '@guardian/transcription-service-common';
import {
type App,
aws_events_targets,
CfnOutput,
Duration,
Fn,
RemovalPolicy,
Size,
Tags,
} from 'aws-cdk-lib';
import { EndpointType } from 'aws-cdk-lib/aws-apigateway';
import {
AutoScalingGroup,
BlockDeviceVolume,
GroupMetrics,
SpotAllocationStrategy,
} from 'aws-cdk-lib/aws-autoscaling';
import {
Alarm,
ComparisonOperator,
Metric,
TreatMissingData,
} from 'aws-cdk-lib/aws-cloudwatch';
import { SnsAction } from 'aws-cdk-lib/aws-cloudwatch-actions';
import { AttributeType, Table } from 'aws-cdk-lib/aws-dynamodb';
import {
InstanceClass,
InstanceSize,
InstanceType,
LaunchTemplate,
MachineImage,
Peer,
Port,
SpotInstanceInterruption,
UserData,
} from 'aws-cdk-lib/aws-ec2';
import { Repository } from 'aws-cdk-lib/aws-ecr';
import { Rule, Schedule } from 'aws-cdk-lib/aws-events';
import {
Effect,
PolicyStatement,
Role,
ServicePrincipal,
} from 'aws-cdk-lib/aws-iam';
import {
Architecture,
Code,
LayerVersion,
Runtime,
} from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { LogGroup } from 'aws-cdk-lib/aws-logs';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { Bucket, HttpMethods } from 'aws-cdk-lib/aws-s3';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { StringParameter } from 'aws-cdk-lib/aws-ssm';
import { JsonPath } from 'aws-cdk-lib/aws-stepfunctions';
const topicArnToName = (topicArn: string) => {
const split = topicArn.split(':');
return split[split.length - 1] ?? '';
};
export class TranscriptionService extends GuStack {
constructor(scope: App, id: string, props: GuStackProps) {
super(scope, id, props);
const APP_NAME = 'transcription-service';
const apiId = `${APP_NAME}-${props.stage}`;
const isProd = props.stage === 'PROD';
const workerAutoscalingGroupName = `transcription-service-workers-${this.stage}`;
const gpuWorkerAutoscalingGroupName = `transcription-service-gpu-workers-${this.stage}`;
if (!props.env?.region) throw new Error('region not provided in props');
const workerAmi = new GuAmiParameter(this, {
app: `${APP_NAME}-worker`,
description: 'AMI to use for the worker instances',
});
const gpuWorkerAmi = new GuAmiParameter(this, {
app: `${APP_NAME}-gpu-worker`,
description: 'AMI to use for the gpu worker instances',
});
const s3PrefixListId = new GuStringParameter(
this,
'S3PrefixListIdParameter',
{
fromSSM: true,
default: `/${this.stage}/${this.stack}/${APP_NAME}/s3PrefixListId`,
description:
'ID of the managed prefix list for the S3 service. See https://docs.aws.amazon.com/systems-manager/latest/userguide/setup-create-vpc.html',
},
);
const giantTranscriptionOutputQueueArn = new GuStringParameter(
this,
'GiantTranscriptionOutputQueueArn',
{
fromSSM: true,
default: `/${props.stage}/investigations/GiantTranscriptionOutputQueueArn`,
},
).valueAsString;
const ssmPrefix = `arn:aws:ssm:${props.env.region}:${this.account}:parameter`;
const ssmPath = `${this.stage}/${this.stack}/${APP_NAME}`;
const domainName =
this.stage === 'PROD'
? 'transcribe.gutools.co.uk'
: 'transcribe.code.dev-gutools.co.uk';
const certificate = new GuCertificate(this, {
app: APP_NAME,
domainName: domainName,
});
const sourceMediaBucket = new GuS3Bucket(
this,
'TranscriptionServiceSourceMediaBucket',
{
app: APP_NAME,
bucketName: `transcription-service-source-media-${this.stage.toLowerCase()}`,
cors: [
{
allowedOrigins: [`https://${domainName}`],
allowedMethods: [HttpMethods.PUT],
},
],
transferAcceleration: true,
},
);
sourceMediaBucket.addLifecycleRule({
expiration: Duration.days(7),
});
const outputBucket = new GuS3Bucket(
this,
'TranscriptionServiceOutputBucket',
{
app: APP_NAME,
bucketName: `transcription-service-output-${this.stage.toLowerCase()}`,
},
);
outputBucket.addLifecycleRule({
expiration: Duration.days(7),
});
// we only want one dev bucket so only create on CODE
if (props.stage === 'CODE') {
const domainNameDev = 'transcribe.local.dev-gutools.co.uk';
const sourceMediaBucketDev = new GuS3Bucket(
this,
'TranscriptionServiceUploadsBucket',
{
app: APP_NAME,
bucketName: `transcription-service-source-media-dev`,
cors: [
{
allowedOrigins: [`https://${domainNameDev}`],
allowedMethods: [HttpMethods.PUT],
},
],
transferAcceleration: true,
},
);
sourceMediaBucketDev.addLifecycleRule({
expiration: Duration.days(1),
});
const transcriptionOutputBucketDev = new GuS3Bucket(
this,
'TranscriptionServiceOutputsBucket',
{
app: APP_NAME,
bucketName: `transcription-service-output-dev`,
},
);
transcriptionOutputBucketDev.addLifecycleRule({
expiration: Duration.days(7),
});
}
const layerBucket = new GuStringParameter(this, 'LayerBucketArn', {
fromSSM: true,
default: '/investigations/transcription-service/lambdaLayerBucketArn',
});
const ffmpegHash = new GuStringParameter(this, 'FFMpegLayerZipKey', {
description:
"Key for the ffmpeg layer's zip file (pushed to layerBucket by publish-ffmpeg-layer.sh script)",
});
const ffmpegLayer = new LayerVersion(
this,
`FFMpegLayer_x86_64-${this.stage}`,
{
code: Code.fromBucket(
Bucket.fromBucketArn(
this,
'LambdaLayerBucket',
layerBucket.valueAsString,
),
ffmpegHash.valueAsString,
),
description: 'FFMpeg Layer',
layerVersionName: 'FFMpegLayer',
compatibleArchitectures: [Architecture.X86_64],
compatibleRuntimes: [
Runtime.NODEJS_LATEST,
Runtime.NODEJS_22_X,
Runtime.NODEJS_20_X,
Runtime.NODEJS_18_X,
],
},
);
const apiLambda = new GuApiLambda(this, 'transcription-service-api', {
fileName: 'api.zip',
handler: 'index.api',
runtime: Runtime.NODEJS_20_X,
monitoringConfiguration: {
noMonitoring: true,
},
app: `${APP_NAME}-api`,
layers: [ffmpegLayer],
ephemeralStorageSize: Size.gibibytes(10), // needed so api can download source files to get the duration
api: {
id: apiId,
description: 'API for transcription service frontend',
domainName: {
certificate,
domainName,
endpointType: EndpointType.REGIONAL,
},
},
});
apiLambda.role?.attachInlinePolicy(
new GuPolicy(this, 'LambdaMediaUploadBucketInlinePolicy', {
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject', 's3:PutObject'],
resources: [`${sourceMediaBucket.bucketArn}/*`],
}),
],
}),
);
apiLambda.role?.attachInlinePolicy(
new GuPolicy(this, 'LambdaOutputBucketInlinePolicy', {
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:PutObject', 's3:GetObject'],
resources: [`${outputBucket.bucketArn}/*`],
}),
],
}),
);
const getParametersPolicy = new PolicyStatement({
effect: Effect.ALLOW,
actions: ['ssm:GetParameter', 'ssm:GetParametersByPath'],
resources: [`${ssmPrefix}/${ssmPath}/*`],
});
const putMetricDataPolicy = new PolicyStatement({
effect: Effect.ALLOW,
actions: ['cloudwatch:PutMetricData'],
resources: ['*'],
});
apiLambda.addToRolePolicy(getParametersPolicy);
apiLambda.addToRolePolicy(putMetricDataPolicy);
// The custom domain name mapped to this API
const apiDomain = apiLambda.api.domainName;
if (!apiDomain) throw new Error('api lambda domainName is undefined');
// CNAME mapping between API Gateway and the custom
new GuCname(this, 'transcription DNS entry', {
app: APP_NAME,
domainName,
ttl: Duration.hours(1),
resourceRecord: apiDomain.domainNameAliasDomainName,
});
// worker output infrastructure
const transcriptionOutputQueue = new Queue(
this,
`${APP_NAME}-output-queue`,
{
queueName: `${APP_NAME}-output-queue-${this.stage}`,
},
);
// worker autoscaling group
const workerApp = `${APP_NAME}-worker`;
const userData = UserData.forLinux({ shebang: '#!/bin/bash' });
const userDataCommands = [
`export STAGE=${props.stage}`,
`export AWS_REGION=${props.env.region}`,
`aws s3 cp s3://${GuDistributionBucketParameter.getInstance(this).valueAsString}/${props.stack}/${props.stage}/${workerApp}/transcription-service-worker_1.0.0_all.deb .`,
`dpkg -i transcription-service-worker_1.0.0_all.deb`,
`service transcription-service-worker start`,
].join('\n');
userData.addCommands(userDataCommands);
const loggingStreamName =
GuLoggingStreamNameParameter.getInstance(this).valueAsString;
const loggingStreamArn = this.formatArn({
service: 'kinesis',
resource: 'stream',
resourceName: loggingStreamName,
});
const workerRole = new GuInstanceRole(this, {
app: workerApp,
additionalPolicies: [
new GuPolicy(this, 'WorkerGetParameters', {
statements: [getParametersPolicy],
}),
new GuAllowPolicy(this, 'WriteToDestinationTopic', {
actions: ['sqs:SendMessage'],
resources: [
transcriptionOutputQueue.queueArn,
giantTranscriptionOutputQueueArn,
],
}),
new GuAllowPolicy(this, 'WriteToELK', {
actions: [
'kinesis:DescribeStream',
'kinesis:PutRecord',
'kinesis:PutRecords',
],
resources: [loggingStreamArn],
}),
new GuAllowPolicy(this, 'SetInstanceProtection', {
actions: ['autoscaling:SetInstanceProtection'],
resources: [
`arn:aws:autoscaling:${props.env.region}:${this.account}:autoScalingGroup:*:autoScalingGroupName/${workerAutoscalingGroupName}`,
`arn:aws:autoscaling:${props.env.region}:${this.account}:autoScalingGroup:*:autoScalingGroupName/${gpuWorkerAutoscalingGroupName}`,
],
}),
new GuAllowPolicy(this, 'WriteCloudwatch', {
actions: ['cloudwatch:PutMetricData'],
resources: ['*'],
}),
],
});
const vpc = GuVpc.fromIdParameter(
this,
'InvestigationsInternetEnabledVpc',
{
availabilityZones: ['eu-west-1a', 'eu-west-1b', 'eu-west-1c'],
},
);
const workerSecurityGroup = new GuSecurityGroup(
this,
`TranscriptionServiceWorkerSG`,
{
app: workerApp,
vpc,
allowAllOutbound: false,
},
);
const privateEndpointSecurityGroup = Fn.importValue(
`internet-enabled-vpc-AWSEndpointSecurityGroup`,
);
workerSecurityGroup.addEgressRule(
Peer.securityGroupId(privateEndpointSecurityGroup),
Port.tcp(443),
);
workerSecurityGroup.addEgressRule(
Peer.prefixList(s3PrefixListId.valueAsString),
Port.tcp(443),
);
const commonLaunchTemplateProps = {
// include tags in instance metadata so that we can work out the STAGE
instanceMetadataTags: true,
userData,
role: workerRole,
securityGroup: workerSecurityGroup,
};
const cpuWorkerLaunchTemplate = new LaunchTemplate(
this,
'TranscriptionWorkerLaunchTemplate',
{
...commonLaunchTemplateProps,
machineImage: MachineImage.genericLinux({
'eu-west-1': workerAmi.valueAsString,
}),
instanceType: InstanceType.of(InstanceClass.C7G, InstanceSize.XLARGE4),
// the size of this block device will determine the max input file size for transcription. In future we could
// attach the block device on startup once we know how large the file to be transcribed is, or try some kind
// of streaming approach to the transcription so we don't need the whole file on disk
blockDevices: [
{
deviceName: '/dev/sda1',
// assuming that we intend to support video files, 50GB seems a reasonable starting point
volume: BlockDeviceVolume.ebs(50),
},
],
},
);
const gpuWorkerLaunchTemplate = new LaunchTemplate(
this,
'TranscriptionWorkerGPULaunchTemplate',
{
...commonLaunchTemplateProps,
machineImage: MachineImage.genericLinux({
'eu-west-1': gpuWorkerAmi.valueAsString,
}),
instanceType: InstanceType.of(InstanceClass.G4DN, InstanceSize.XLARGE),
blockDevices: [
{
deviceName: '/dev/sda1',
// The AMI with the nvidia cuda drivers and whisperx installed is enormous
volume: BlockDeviceVolume.ebs(100),
},
],
},
);
// instance types we are happy to use for workers. Note - order matters as when launching 'on demand' instances
// the ASG will start at the top of the list and work down until it manages to launch an instance
const acceptableInstanceTypes = isProd
? [
InstanceType.of(InstanceClass.C7G, InstanceSize.XLARGE4),
InstanceType.of(InstanceClass.C6G, InstanceSize.XLARGE4),
InstanceType.of(InstanceClass.M7G, InstanceSize.XLARGE4),
InstanceType.of(InstanceClass.C7G, InstanceSize.XLARGE8),
InstanceType.of(InstanceClass.C6G, InstanceSize.XLARGE8),
]
: [InstanceType.of(InstanceClass.T4G, InstanceSize.MEDIUM)];
const gpuInstanceTypes = isProd
? [
InstanceType.of(InstanceClass.G4DN, InstanceSize.XLARGE),
InstanceType.of(InstanceClass.G4DN, InstanceSize.XLARGE2),
InstanceType.of(InstanceClass.G5, InstanceSize.XLARGE),
]
: [InstanceType.of(InstanceClass.G4DN, InstanceSize.XLARGE)];
const guSubnets = GuVpc.subnetsFromParameter(this, {
type: SubnetType.PRIVATE,
app: workerApp,
});
const instanceTypeToOverride = (instanceType: InstanceType) => ({
instanceType,
spotOptions: {
interruptionBehavior: SpotInstanceInterruption.TERMINATE,
},
});
const commonAsgProps = {
minCapacity: 0,
maxCapacity: isProd ? 20 : 4,
vpc,
vpcSubnets: {
subnets: guSubnets,
},
groupMetrics: [GroupMetrics.all()],
};
const commonInstancesDistributionprops = {
// 0 is the default, including this here just to make it more obvious what's happening
onDemandBaseCapacity: 0,
// if this value is set to 100, then we won't use spot instances at all, if it is 0 then we use 100% spot
onDemandPercentageAboveBaseCapacity: 10,
spotAllocationStrategy: SpotAllocationStrategy.CAPACITY_OPTIMIZED,
};
// unfortunately GuAutoscalingGroup doesn't support having a mixedInstancesPolicy so using the basic ASG here
const transcriptionWorkerASG = new AutoScalingGroup(
this,
'TranscriptionWorkerASG',
{
...commonAsgProps,
autoScalingGroupName: workerAutoscalingGroupName,
mixedInstancesPolicy: {
launchTemplate: cpuWorkerLaunchTemplate,
instancesDistribution: {
...commonInstancesDistributionprops,
spotMaxPrice: '0.6202',
},
launchTemplateOverrides: acceptableInstanceTypes.map(
instanceTypeToOverride,
),
},
},
);
const transcriptionGpuWorkerASG = new AutoScalingGroup(
this,
'TranscriptionGpuWorkerASG',
{
...commonAsgProps,
autoScalingGroupName: gpuWorkerAutoscalingGroupName,
mixedInstancesPolicy: {
launchTemplate: gpuWorkerLaunchTemplate,
instancesDistribution: {
...commonInstancesDistributionprops,
spotMaxPrice: '0.5260',
},
launchTemplateOverrides: gpuInstanceTypes.map(instanceTypeToOverride),
},
},
);
Tags.of(transcriptionWorkerASG).add(
'LogKinesisStreamName',
GuLoggingStreamNameParameter.getInstance(this).valueAsString,
{ applyToLaunchedInstances: true },
);
Tags.of(transcriptionGpuWorkerASG).add(
'LogKinesisStreamName',
GuLoggingStreamNameParameter.getInstance(this).valueAsString,
{ applyToLaunchedInstances: true },
);
Tags.of(transcriptionWorkerASG).add('SystemdUnit', `${workerApp}.service`, {
applyToLaunchedInstances: true,
});
Tags.of(transcriptionWorkerASG).add('App', `transcription-service-worker`, {
applyToLaunchedInstances: true,
});
Tags.of(transcriptionGpuWorkerASG).add(
'SystemdUnit',
`${workerApp}.service`,
{
applyToLaunchedInstances: true,
},
);
Tags.of(transcriptionGpuWorkerASG).add(
'App',
`transcription-service-gpu-worker`,
{
applyToLaunchedInstances: true,
},
);
const transcriptionDeadLetterQueue = new Queue(
this,
`${APP_NAME}-task-dead-letter-queue`,
{
fifo: true,
queueName: `${APP_NAME}-task-dead-letter-queue-${this.stage}.fifo`,
contentBasedDeduplication: true,
},
);
// SQS queue for transcription tasks from API lambda to worker EC2 instances
const taskQueueProps = {
fifo: true,
queueName: `${APP_NAME}-task-queue-${this.stage}.fifo`,
// this is the default. 30 seconds should be enough time to get the
// size of the file from s3 and estimate transcription time. If it's
// not, we'll need to increase visibilityTimeout
visibilityTimeout: Duration.seconds(30),
// contentBasedDeduplication takes a sha-256 hash of the message body to use as the deduplication ID. In future
// we might choose to use a hash of the actual file to be transcribed instead (but I can't really think where
// that would be particularly helpful)
contentBasedDeduplication: true,
deadLetterQueue: {
queue: transcriptionDeadLetterQueue,
maxReceiveCount: MAX_RECEIVE_COUNT,
},
};
const transcriptionTaskQueue = new Queue(
this,
`${APP_NAME}-task-queue`,
taskQueueProps,
);
const transcriptionGpuTaskQueue = new Queue(
this,
`${APP_NAME}-gpu-task-queue`,
{
...taskQueueProps,
queueName: `${APP_NAME}-gpu-task-queue-${this.stage}.fifo`,
},
);
new StringParameter(this, 'GPUTaskQueueUrlParameter', {
parameterName: `/${ssmPath}/gpuTaskQueueUrl`,
stringValue: transcriptionGpuTaskQueue.queueUrl,
});
// allow API lambda to write to queue
transcriptionTaskQueue.grantSendMessages(apiLambda);
transcriptionGpuTaskQueue.grantSendMessages(apiLambda);
// allow worker to receive message from queue
transcriptionTaskQueue.grantConsumeMessages(transcriptionWorkerASG);
transcriptionGpuTaskQueue.grantConsumeMessages(transcriptionGpuWorkerASG);
// allow worker to write messages to the dead letter queue
transcriptionDeadLetterQueue.grantSendMessages(transcriptionWorkerASG);
transcriptionDeadLetterQueue.grantSendMessages(transcriptionGpuWorkerASG);
const mediaDownloadDeadLetterQueue = new Queue(
this,
`${APP_NAME}-media-download-dead-letter-queue`,
{
fifo: true,
queueName: `${APP_NAME}-media-download-dead-letter-queue-${this.stage}.fifo`,
contentBasedDeduplication: true,
},
);
// SQS queue for media download tasks from API lambda to media-download service
const mediaDownloadTaskQueue = new Queue(
this,
`${APP_NAME}-media-download-task-queue`,
{
fifo: true,
queueName: `${APP_NAME}-media-download-task-queue-${this.stage}.fifo`,
visibilityTimeout: Duration.seconds(30),
contentBasedDeduplication: true,
deadLetterQueue: {
queue: mediaDownloadDeadLetterQueue,
maxReceiveCount: MAX_RECEIVE_COUNT,
},
},
);
mediaDownloadTaskQueue.grantSendMessages(apiLambda);
const mediaDownloadApp = 'media-download';
const sshKeySecret = new Secret(this, 'media-download-ssh-key', {
secretName: `media-download-ssh-key-${this.stage}`,
});
const alarmTopicArn = new GuStringParameter(
this,
'InvestigationsAlarmTopicArn',
{
fromSSM: true,
default: `/${props.stage}/investigations/alarmTopicArn`,
},
).valueAsString;
const alarmTopicName = topicArnToName(alarmTopicArn);
const mediaDownloadTask = new GuEcsTask(this, 'media-download-task', {
app: mediaDownloadApp,
vpc,
subnets: GuVpc.subnetsFromParameterFixedNumber(
this,
{
type: SubnetType.PRIVATE,
app: mediaDownloadApp,
},
3,
),
containerConfiguration: {
repository: Repository.fromRepositoryName(
this,
'MediaDownloadRepository',
`transcription-service-${mediaDownloadApp}`,
),
type: 'repository',
version: process.env['CONTAINER_VERSION'] ?? 'main',
},
taskTimeoutInMinutes: 120,
monitoringConfiguration:
this.stage === 'PROD'
? {
noMonitoring: false,
snsTopicArn: alarmTopicArn,
}
: { noMonitoring: true },
securityGroups: [
new GuSecurityGroup(this, 'media-download-sg', {
vpc,
allowAllOutbound: true,
app: mediaDownloadApp,
}),
],
customTaskPolicies: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['sqs:ReceiveMessage'],
resources: [mediaDownloadTaskQueue.queueArn],
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['sqs:SendMessage'],
resources: [
transcriptionTaskQueue.queueArn,
transcriptionGpuTaskQueue.queueArn,
transcriptionOutputQueue.queueArn,
],
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['secretsmanager:GetSecretValue'],
resources: [sshKeySecret.secretArn],
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:PutObject'],
resources: [`${outputBucket.bucketArn}/*`],
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:PutObject', 's3:GetObject'],
resources: [`${sourceMediaBucket.bucketArn}/downloaded-media/*`],
}),
getParametersPolicy,
],
storage: 50,
enableDistributablePolicy: false,
environmentOverrides: [
{
name: 'MESSAGE_BODY',
value: JsonPath.stringAt('$[0].body'),
},
{
name: 'AWS_REGION',
value: this.region,
},
{
name: 'STAGE',
value: this.stage,
},
{
name: 'APP',
value: mediaDownloadApp,
},
],
});
const downloadVolume = {
name: `${mediaDownloadApp}-download-volume`,
};
const tempVolume = {
name: `${mediaDownloadApp}-temp-volume`,
};
mediaDownloadTask.taskDefinition.addVolume(downloadVolume);
mediaDownloadTask.taskDefinition.addVolume(tempVolume);
mediaDownloadTask.containerDefinition.addMountPoints({
sourceVolume: downloadVolume.name,
containerPath: '/media-download', // needs to match ECS_MEDIA_DOWNLOAD_WORKING_DIRECTORY in media-download index.ts
readOnly: false,
});
mediaDownloadTask.containerDefinition.addMountPoints({
sourceVolume: tempVolume.name,
containerPath: '/tmp', // needed by yt-dlp
readOnly: false,
});
const pipeRole = new Role(this, 'eventbridge-pipe-role', {
assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
});
new GuAllowPolicy(this, 'sqs-read', {
actions: [
'sqs:ReceiveMessage',
'sqs:DeleteMessage',
'sqs:GetQueueAttributes',
],
resources: [mediaDownloadTaskQueue.queueArn],
roles: [pipeRole],
});
new GuAllowPolicy(this, 'sfn-start', {
actions: ['states:StartExecution'],
resources: [mediaDownloadTask.stateMachine.stateMachineArn],
roles: [pipeRole],
});
const logGroup = new LogGroup(this, 'media-download-queue-sfn-pipe-log', {
logGroupName: `/aws/pipes/${this.stage}/media-download-queue-sfn-pipe`,
retention: 7,
removalPolicy: RemovalPolicy.SNAPSHOT,
});
new CfnPipe(this, 'media-download-sqs-sfn', {
source: mediaDownloadTaskQueue.queueArn,
target: mediaDownloadTask.stateMachine.stateMachineArn,
targetParameters: {
stepFunctionStateMachineParameters: {
invocationType: 'FIRE_AND_FORGET',
},
},
roleArn: pipeRole.roleArn,
name: `media-download-queue-sfn-pipe-${this.stage}`,
desiredState: 'RUNNING',
logConfiguration: {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
level: 'INFO',
},
sourceParameters: {
sqsQueueParameters: {
batchSize: 1,
},
},
description:
'Pipe to trigger the media download service from the associated SQS queue.',
});
const transcriptTable = new Table(this, 'TranscriptTable', {
tableName: `${APP_NAME}-${this.stage}`,
partitionKey: {
name: 'id',
type: AttributeType.STRING,
},
readCapacity: 1,
writeCapacity: 1,
});
// Enable nightly backups (via https://github.com/guardian/aws-backup)
Tags.of(transcriptTable).add('devx-backup-enabled', 'true');
const outputHandlerLambda = new GuLambdaFunction(
this,
'transcription-service-output-handler',
{
fileName: 'output-handler.zip',
handler: 'index.outputHandler',
runtime: Runtime.NODEJS_20_X,
app: `${APP_NAME}-output-handler`,
errorPercentageMonitoring:
this.stage === 'PROD'
? {
toleratedErrorPercentage: 0,
noMonitoring: false,
snsTopicName: alarmTopicName,
}
: undefined,
},
);
transcriptTable.grantReadWriteData(outputHandlerLambda);
transcriptTable.grantReadWriteData(apiLambda);
// trigger output-handler lambda from queue
outputHandlerLambda.addEventSource(
new SqsEventSource(transcriptionOutputQueue),
);
outputHandlerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['ses:SendEmail', 'ses:SendRawEmail'],
resources: ['*'],
}),
);
outputHandlerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [
`${outputBucket.bucketArn}/*`,
`${sourceMediaBucket.bucketArn}/*`,
],
}),
);
outputHandlerLambda.addToRolePolicy(getParametersPolicy);
outputHandlerLambda.addToRolePolicy(putMetricDataPolicy);
const mediaExportLambda = new GuLambdaFunction(
this,
'transcription-service-media-export',
{
fileName: 'media-export.zip',
handler: 'index.mediaExport',
runtime: Runtime.NODEJS_20_X,
app: `${APP_NAME}-media-export`,
ephemeralStorageSize: Size.mebibytes(10240),
memorySize: 2048,
timeout: Duration.seconds(900),
errorPercentageMonitoring:
this.stage === 'PROD'
? {
toleratedErrorPercentage: 0,
noMonitoring: false,
snsTopicName: alarmTopicName,
}
: undefined,
},
);
new StringParameter(this, 'ExportFunctionName', {
parameterName: `/${ssmPath}/app/mediaExportFunctionName`,
stringValue: mediaExportLambda.functionName,
});
mediaExportLambda.addToRolePolicy(getParametersPolicy);
mediaExportLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [`${sourceMediaBucket.bucketArn}/*`],
}),
);
transcriptTable.grantReadWriteData(mediaExportLambda);
mediaExportLambda.grantInvoke(apiLambda);
new CfnOutput(this, 'WorkerRoleArn', {
exportName: `WorkerRoleArn-${props.stage}`,
value: workerRole.roleArn,
});
const workerCapacityManagerLambda = new GuLambdaFunction(
this,
'transcription-service-worker-capacity-manager',
{
fileName: 'worker-capacity-manager.zip',
handler: 'index.workerCapacityManager',
runtime: Runtime.NODEJS_20_X,
app: `${APP_NAME}-worker-capacity-manager`,
},
);
workerCapacityManagerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
'autoscaling:SetDesiredCapacity',
'autoscaling:DescribeAutoScalingInstances',
],
resources: [
transcriptionWorkerASG.autoScalingGroupArn,
transcriptionGpuWorkerASG.autoScalingGroupArn,
],
}),
);
workerCapacityManagerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['autoscaling:DescribeAutoScalingGroups'],
resources: ['*'],
}),
);
workerCapacityManagerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['sqs:GetQueueAttributes'],
resources: [
transcriptionTaskQueue.queueArn,
transcriptionGpuTaskQueue.queueArn,
],
}),
);
workerCapacityManagerLambda.addToRolePolicy(getParametersPolicy);
new Rule(this, 'worker-capacity-manager-rule', {
description:
'Manages worker capacity by updating the desired capacity of ASG based on queue length',
targets: [
new aws_events_targets.LambdaFunction(workerCapacityManagerLambda),
],
schedule: Schedule.rate(Duration.minutes(1)),
});
// alarms
if (isProd) {
const alarms = [
// alarm when a message is added to the dead letter queue
// note that queue metrics go to 'sleep' if it is empty for more than 6 hours, so it may take up to 16 minutes
// for this alarm to trigger - see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-monitoring-using-cloudwatch.html
new Alarm(this, 'DeadLetterQueueAlarm', {
alarmName: `transcription-service-dead-letter-queue-${props.stage}`,
metric:
transcriptionDeadLetterQueue.metricApproximateNumberOfMessagesVisible(
{ period: Duration.minutes(1), statistic: 'max' },
),
threshold: 1,
evaluationPeriods: 1,
actionsEnabled: true,
alarmDescription: `A transcription job has been sent to the dead letter queue. This may be because ffmpeg can't convert the file (maybe it's a JPEG) or because the transcription job has failed multiple times.`,
treatMissingData: TreatMissingData.IGNORE,
comparisonOperator:
ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
}),
// alarm when failure metric is greater than 0
new Alarm(this, 'FailureAlarm', {
alarmName: `transcription-service-failure-${props.stage}`,
// reference the custom metric created in metrics.ts library
metric: new Metric({
namespace: 'TranscriptionService',
metricName: 'Failure',
dimensionsMap: {
Stage: props.stage,
},
statistic: 'sum',
period: Duration.minutes(1),
}),
threshold: 1,
comparisonOperator:
ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
evaluationPeriods: 1,
actionsEnabled: true,
alarmDescription: 'A transcription service failure has occurred',
treatMissingData: TreatMissingData.IGNORE,
}),
// alarm when at least one instance has been running in the worker asg during every 5 minute period for
// more than 12 hours
new Alarm(this, 'WorkerInstanceAlarm', {
alarmName: `transcription-service-worker-instances-${props.stage}`,
// this doesn't actually create the metric - just a reference to it
metric: new Metric({
namespace: 'AWS/AutoScaling',
metricName: 'GroupTotalInstances',
dimensionsMap: {
AutoScalingGroupName: transcriptionWorkerASG.autoScalingGroupName,
},
statistic: 'min',
period: Duration.minutes(5),
}),
threshold: 1,
comparisonOperator:
ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
evaluationPeriods: 12 * 12, // 12 hours as metric has period of 5 minutes
actionsEnabled: true,
alarmDescription: `There has been at least 1 worker instance running for 12 hours.
This could mean that a worker is failing to be scaled in, which could have significant cost implications.
Please check that all running workers are doing something useful.`,
treatMissingData: TreatMissingData.IGNORE,
}),
];
const snsAction = new SnsAction(
Topic.fromTopicArn(this, 'TranscriptionAlarmTopic', alarmTopicArn),
);
alarms.forEach((alarm) => {
alarm.addAlarmAction(snsAction);
});
}
}
}