packages/cdk/lib/cloudquery/task.ts (298 lines of code) (raw):

import type { AppIdentity, GuStack } from '@guardian/cdk/lib/constructs/core'; import type { GuSecurityGroup } from '@guardian/cdk/lib/constructs/ec2'; import { Duration, Tags } from 'aws-cdk-lib'; import type { ISecurityGroup } from 'aws-cdk-lib/aws-ec2'; import { ContainerDependencyCondition, FargateTaskDefinition, FireLensLogDriver, FirelensLogRouterType, LogDrivers, PropagatedTagSource, Secret, } from 'aws-cdk-lib/aws-ecs'; import type { Cluster, RepositoryImage, Volume } from 'aws-cdk-lib/aws-ecs'; import type { ScheduledFargateTaskProps } from 'aws-cdk-lib/aws-ecs-patterns'; import { ScheduledFargateTask } from 'aws-cdk-lib/aws-ecs-patterns'; import type { IManagedPolicy, PolicyStatement } from 'aws-cdk-lib/aws-iam'; import { ManagedPolicy, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { RetentionDays } from 'aws-cdk-lib/aws-logs'; import type { DatabaseInstance } from 'aws-cdk-lib/aws-rds'; import { dump } from 'js-yaml'; import type { CloudqueryConfig, CloudqueryWriteMode } from './config'; import { postgresDestinationConfig, serviceCatalogueConfigDirectory, } from './config'; import { Images } from './images'; import { singletonPolicy } from './policies'; import { scheduleFrequencyMs } from './schedule'; export interface ScheduledCloudqueryTaskProps extends AppIdentity, Omit<ScheduledFargateTaskProps, 'cluster'> { /** * The name of the task. * This will get added to the `Name` tag of the task definition. */ name: string; /** * The Postgres database for CloudQuery to connect to. */ db: DatabaseInstance; /** * The security group to allow CloudQuery to connect to the database. */ dbAccess: GuSecurityGroup; /** * The ECS cluster to run the task in. */ cluster: Cluster; /** * The name of the Kinesis stream to send logs to. */ loggingStreamName: string; /** * Any IAM managed policies to attach to the task. */ managedPolicies: IManagedPolicy[]; /** * IAM policies to attach to the task. */ policies: PolicyStatement[]; /** * The CloudQuery config to use to collect data from. * * @see https://docs.cloudquery.io/docs/reference/source-spec */ sourceConfig: CloudqueryConfig; /** * Any secrets to pass to the CloudQuery container. * * @see https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_ecs.ContainerDefinitionOptions.html#secrets * @see https://repost.aws/knowledge-center/ecs-data-security-container-task */ secrets?: Record<string, Secret>; /** * Any additional commands to run within the CloudQuery container. * These are executed first. * * The containers filesystem is mostly read-only. If you need to write files you can use the /usr/share/cloudquery folder. */ additionalCommands?: string[]; /** * Any additional security groups applied to the task. * For example, a group allowing access to Riff-Raff. */ additionalSecurityGroups?: ISecurityGroup[]; /** * Run this task as a singleton? * Useful to help avoid overlapping runs. */ runAsSingleton: boolean; /** * The CloudQuery API key, stored in AWS Secrets Manager. * * @see https://docs.cloudquery.io/docs/deployment/generate-api-key * @see https://cloud.cloudquery.io/teams/the-guardian/api-keys */ cloudQueryApiKey: Secret; /** * The image of a CloudQuery plugin that is distributed via Docker, * i.e. plugins not written in Go. * * This image will be run on its own, exposing the GRPC server on localhost:7777. * The CloudQuery source config should be configured with a registry of grpc, and path of localhost:7777. * * @see https://docs.cloudquery.io/docs/reference/source-spec */ dockerDistributedPluginImage?: RepositoryImage; /** * Specifies the update method to use when inserting rows to Postgres. */ writeMode: CloudqueryWriteMode; } export class ScheduledCloudqueryTask extends ScheduledFargateTask { public readonly sourceConfig: CloudqueryConfig; constructor(scope: GuStack, id: string, props: ScheduledCloudqueryTaskProps) { const { name, db, cluster, app, dbAccess, schedule, managedPolicies, policies, loggingStreamName, sourceConfig, enabled, secrets, additionalCommands = [], memoryLimitMiB = 512, cpu, additionalSecurityGroups = [], runAsSingleton, cloudQueryApiKey, dockerDistributedPluginImage, writeMode, } = props; const { region, stack, stage } = scope; const thisRepo = 'guardian/service-catalogue'; // TODO get this from GuStack const frequency = scheduleFrequencyMs(schedule); const roleName = `${app}-${stage}-task-${name}`; const taskRole = new Role(scope, roleName, { assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'), roleName, }); const xrayPolicy = ManagedPolicy.fromAwsManagedPolicyName( 'AWSXrayWriteOnlyAccess', ); const task = new FargateTaskDefinition(scope, `${id}TaskDefinition`, { memoryLimitMiB, cpu, taskRole, }); /* A scheduled task (i.e. `this`) cannot be tagged, so we tag the task definition instead. */ Tags.of(task).add('Name', name); const destinationConfig = postgresDestinationConfig(writeMode); /* This error shouldn't ever be thrown as AWS CDK creates a secret by default, it is just typed as optional. See https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_rds.DatabaseInstance.html#credentials. TODO: Remove this once IAM auth is working. */ if (!db.secret) { throw new Error('DB Secret is missing'); } const fireLensLogDriver = new FireLensLogDriver({ options: { Name: `kinesis_streams`, region, stream: loggingStreamName, retry_limit: '2', }, }); const cloudqueryTask = task.addContainer(`${id}Container`, { image: Images.cloudquery, entryPoint: [''], environment: { GOMEMLIMIT: `${Math.floor(memoryLimitMiB * 0.8)}MiB`, }, secrets: { ...secrets, DB_USERNAME: Secret.fromSecretsManager(db.secret, 'username'), DB_HOST: Secret.fromSecretsManager(db.secret, 'host'), DB_PASSWORD: Secret.fromSecretsManager(db.secret, 'password'), CLOUDQUERY_API_KEY: cloudQueryApiKey, }, dockerLabels: { Stack: stack, Stage: stage, App: app, Name: name, }, readonlyRootFilesystem: true, command: [ '/bin/sh', '-c', [ ...additionalCommands, `printf '${dump(sourceConfig)}' > ${serviceCatalogueConfigDirectory}/source.yaml`, `printf '${dump(destinationConfig)}' > ${serviceCatalogueConfigDirectory}/destination.yaml`, `/app/cloudquery sync ${serviceCatalogueConfigDirectory}/source.yaml ${serviceCatalogueConfigDirectory}/destination.yaml --log-format json --log-console --no-log-file`, ].join(';'), ], logging: fireLensLogDriver, }); const configVolume: Volume = { name: 'config-volume', }; task.addVolume(configVolume); const cqVolume: Volume = { name: 'cloudquery-volume', }; task.addVolume(cqVolume); const tmpVolume: Volume = { name: 'tmp-volume', }; task.addVolume(tmpVolume); cloudqueryTask.addMountPoints( { // So that we can write task config to this directory containerPath: serviceCatalogueConfigDirectory, sourceVolume: configVolume.name, readOnly: false, }, { // So that Cloudquery can write to this directory containerPath: '/app/.cq', sourceVolume: cqVolume.name, readOnly: false, }, { // So that Cloudquery can write temporary data containerPath: '/tmp', sourceVolume: tmpVolume.name, readOnly: false, }, ); const otel = task.addContainer(`${id}AWSOTELCollector`, { image: Images.otelCollector, command: ['--config=/etc/ecs/ecs-xray.yaml'], logging: fireLensLogDriver, healthCheck: { command: ['CMD', '/healthcheck'], interval: Duration.seconds(5), }, portMappings: [ { containerPort: 4318, }, ], readonlyRootFilesystem: true, }); cloudqueryTask.addContainerDependencies({ container: otel, condition: ContainerDependencyCondition.HEALTHY, }); if (dockerDistributedPluginImage) { const additionalCloudQueryContainer = task.addContainer( `${id}PluginContainer`, { image: dockerDistributedPluginImage, logging: fireLensLogDriver, essential: false, readonlyRootFilesystem: true, }, ); cloudqueryTask.addContainerDependencies({ container: additionalCloudQueryContainer, condition: ContainerDependencyCondition.START, }); } if (runAsSingleton) { const operationInProgress = 114; const success = 0; const singletonTask = task.addContainer(`${id}AwsCli`, { image: Images.singletonImage, entryPoint: [''], command: [ '/bin/bash', '-c', [ // Who am I? `ECS_CLUSTER=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Cluster')`, `ECS_FAMILY=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Family')`, `ECS_TASK_ARN=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.TaskARN')`, // How many more of me are there? `RUNNING=$(aws ecs list-tasks --cluster $ECS_CLUSTER --family $ECS_FAMILY | jq '.taskArns | length')`, // Exit zero (successful) if I'm the only one running `[[ $\{RUNNING} > 1 ]] && exit ${operationInProgress} || exit ${success}`, ].join(';'), ], readonlyRootFilesystem: true, logging: fireLensLogDriver, /* A container listed as a dependency of another cannot be marked as essential. Below, we describe a dependency such that CloudQuery will only start if the singleton step succeeds. */ essential: false, }); cloudqueryTask.addContainerDependencies({ container: singletonTask, condition: ContainerDependencyCondition.SUCCESS, }); task.addToTaskRolePolicy(singletonPolicy(cluster)); } const tableValues = sourceConfig.spec.tables ?.map((table) => table.replaceAll('*', '%')) .map((table) => `('${table}', ${frequency})`) .join(','); task.addContainer(`${id}PostgresContainer`, { image: Images.postgres, entryPoint: [''], secrets: { PGUSER: Secret.fromSecretsManager(db.secret, 'username'), PGHOST: Secret.fromSecretsManager(db.secret, 'host'), PGPASSWORD: Secret.fromSecretsManager(db.secret, 'password'), }, dockerLabels: { Stack: stack, Stage: stage, App: app, Name: name, }, command: [ '/bin/sh', '-c', [ `psql -c "INSERT INTO cloudquery_table_frequency VALUES ${tableValues} ON CONFLICT (table_name) DO UPDATE SET frequency = ${frequency}"`, ].join(';'), ], logging: fireLensLogDriver, essential: false, readonlyRootFilesystem: true, }); const firelensLogRouter = task.addFirelensLogRouter(`${id}Firelens`, { image: Images.devxLogs, logging: LogDrivers.awsLogs({ streamPrefix: [stack, stage, app].join('/'), logRetention: RetentionDays.ONE_DAY, }), environment: { STACK: stack, STAGE: stage, APP: app, GU_REPO: thisRepo, }, firelensConfig: { type: FirelensLogRouterType.FLUENTBIT, }, readonlyRootFilesystem: true, }); const firelensVolume: Volume = { name: 'firelens-volume', }; task.addVolume(firelensVolume); firelensLogRouter.addMountPoints({ containerPath: '/init', sourceVolume: firelensVolume.name, readOnly: false, }); managedPolicies.forEach((policy) => task.taskRole.addManagedPolicy(policy)); policies.forEach((policy) => task.addToTaskRolePolicy(policy)); task.taskRole.addManagedPolicy(xrayPolicy); db.grantConnect(task.taskRole); super(scope, id, { schedule, cluster, vpc: cluster.vpc, subnetSelection: { subnets: cluster.vpc.privateSubnets }, scheduledFargateTaskDefinitionOptions: { taskDefinition: task, }, securityGroups: [dbAccess, ...additionalSecurityGroups], enabled, propagateTags: PropagatedTagSource.TASK_DEFINITION, }); this.sourceConfig = sourceConfig; } }