constructor()

in packages/cdk/lib/cloudquery/task.ts [132:431]


	constructor(scope: GuStack, id: string, props: ScheduledCloudqueryTaskProps) {
		const {
			name,
			db,
			cluster,
			app,
			dbAccess,
			schedule,
			managedPolicies,
			policies,
			loggingStreamName,
			sourceConfig,
			enabled,
			secrets,
			additionalCommands = [],
			memoryLimitMiB = 512,
			cpu,
			additionalSecurityGroups = [],
			runAsSingleton,
			cloudQueryApiKey,
			dockerDistributedPluginImage,
			writeMode,
		} = props;
		const { region, stack, stage } = scope;
		const thisRepo = 'guardian/service-catalogue'; // TODO get this from GuStack
		const frequency = scheduleFrequencyMs(schedule);

		const roleName = `${app}-${stage}-task-${name}`;
		const taskRole = new Role(scope, roleName, {
			assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
			roleName,
		});

		const xrayPolicy = ManagedPolicy.fromAwsManagedPolicyName(
			'AWSXrayWriteOnlyAccess',
		);

		const task = new FargateTaskDefinition(scope, `${id}TaskDefinition`, {
			memoryLimitMiB,
			cpu,
			taskRole,
		});

		/*
		A scheduled task (i.e. `this`) cannot be tagged, so we tag the task definition instead.
		 */
		Tags.of(task).add('Name', name);

		const destinationConfig = postgresDestinationConfig(writeMode);

		/*
		This error shouldn't ever be thrown as AWS CDK creates a secret by default,
		it is just typed as optional.

		See https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_rds.DatabaseInstance.html#credentials.

		TODO: Remove this once IAM auth is working.
		 */
		if (!db.secret) {
			throw new Error('DB Secret is missing');
		}

		const fireLensLogDriver = new FireLensLogDriver({
			options: {
				Name: `kinesis_streams`,
				region,
				stream: loggingStreamName,
				retry_limit: '2',
			},
		});

		const cloudqueryTask = task.addContainer(`${id}Container`, {
			image: Images.cloudquery,
			entryPoint: [''],
			environment: {
				GOMEMLIMIT: `${Math.floor(memoryLimitMiB * 0.8)}MiB`,
			},
			secrets: {
				...secrets,
				DB_USERNAME: Secret.fromSecretsManager(db.secret, 'username'),
				DB_HOST: Secret.fromSecretsManager(db.secret, 'host'),
				DB_PASSWORD: Secret.fromSecretsManager(db.secret, 'password'),
				CLOUDQUERY_API_KEY: cloudQueryApiKey,
			},
			dockerLabels: {
				Stack: stack,
				Stage: stage,
				App: app,
				Name: name,
			},
			readonlyRootFilesystem: true,
			command: [
				'/bin/sh',
				'-c',
				[
					...additionalCommands,
					`printf '${dump(sourceConfig)}' > ${serviceCatalogueConfigDirectory}/source.yaml`,
					`printf '${dump(destinationConfig)}' > ${serviceCatalogueConfigDirectory}/destination.yaml`,
					`/app/cloudquery sync ${serviceCatalogueConfigDirectory}/source.yaml ${serviceCatalogueConfigDirectory}/destination.yaml --log-format json --log-console --no-log-file`,
				].join(';'),
			],
			logging: fireLensLogDriver,
		});

		const configVolume: Volume = {
			name: 'config-volume',
		};
		task.addVolume(configVolume);

		const cqVolume: Volume = {
			name: 'cloudquery-volume',
		};
		task.addVolume(cqVolume);

		const tmpVolume: Volume = {
			name: 'tmp-volume',
		};
		task.addVolume(tmpVolume);

		cloudqueryTask.addMountPoints(
			{
				// So that we can write task config to this directory
				containerPath: serviceCatalogueConfigDirectory,
				sourceVolume: configVolume.name,
				readOnly: false,
			},
			{
				// So that Cloudquery can write to this directory
				containerPath: '/app/.cq',
				sourceVolume: cqVolume.name,
				readOnly: false,
			},
			{
				// So that Cloudquery can write temporary data
				containerPath: '/tmp',
				sourceVolume: tmpVolume.name,
				readOnly: false,
			},
		);

		const otel = task.addContainer(`${id}AWSOTELCollector`, {
			image: Images.otelCollector,
			command: ['--config=/etc/ecs/ecs-xray.yaml'],
			logging: fireLensLogDriver,
			healthCheck: {
				command: ['CMD', '/healthcheck'],
				interval: Duration.seconds(5),
			},
			portMappings: [
				{
					containerPort: 4318,
				},
			],
			readonlyRootFilesystem: true,
		});

		cloudqueryTask.addContainerDependencies({
			container: otel,
			condition: ContainerDependencyCondition.HEALTHY,
		});

		if (dockerDistributedPluginImage) {
			const additionalCloudQueryContainer = task.addContainer(
				`${id}PluginContainer`,
				{
					image: dockerDistributedPluginImage,
					logging: fireLensLogDriver,
					essential: false,
					readonlyRootFilesystem: true,
				},
			);

			cloudqueryTask.addContainerDependencies({
				container: additionalCloudQueryContainer,
				condition: ContainerDependencyCondition.START,
			});
		}

		if (runAsSingleton) {
			const operationInProgress = 114;
			const success = 0;

			const singletonTask = task.addContainer(`${id}AwsCli`, {
				image: Images.singletonImage,
				entryPoint: [''],
				command: [
					'/bin/bash',
					'-c',
					[
						// Who am I?
						`ECS_CLUSTER=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Cluster')`,
						`ECS_FAMILY=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Family')`,
						`ECS_TASK_ARN=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.TaskARN')`,

						// How many more of me are there?
						`RUNNING=$(aws ecs list-tasks --cluster $ECS_CLUSTER --family $ECS_FAMILY | jq '.taskArns | length')`,

						// Exit zero (successful) if I'm the only one running
						`[[ $\{RUNNING} > 1 ]] && exit ${operationInProgress} || exit ${success}`,
					].join(';'),
				],
				readonlyRootFilesystem: true,
				logging: fireLensLogDriver,

				/*
				A container listed as a dependency of another cannot be marked as essential.
				Below, we describe a dependency such that CloudQuery will only start if the singleton step succeeds.
			 	*/
				essential: false,
			});

			cloudqueryTask.addContainerDependencies({
				container: singletonTask,
				condition: ContainerDependencyCondition.SUCCESS,
			});

			task.addToTaskRolePolicy(singletonPolicy(cluster));
		}

		const tableValues = sourceConfig.spec.tables
			?.map((table) => table.replaceAll('*', '%'))
			.map((table) => `('${table}', ${frequency})`)
			.join(',');

		task.addContainer(`${id}PostgresContainer`, {
			image: Images.postgres,
			entryPoint: [''],
			secrets: {
				PGUSER: Secret.fromSecretsManager(db.secret, 'username'),
				PGHOST: Secret.fromSecretsManager(db.secret, 'host'),
				PGPASSWORD: Secret.fromSecretsManager(db.secret, 'password'),
			},
			dockerLabels: {
				Stack: stack,
				Stage: stage,
				App: app,
				Name: name,
			},
			command: [
				'/bin/sh',
				'-c',
				[
					`psql -c "INSERT INTO cloudquery_table_frequency VALUES ${tableValues} ON CONFLICT (table_name) DO UPDATE SET frequency = ${frequency}"`,
				].join(';'),
			],
			logging: fireLensLogDriver,
			essential: false,
			readonlyRootFilesystem: true,
		});

		const firelensLogRouter = task.addFirelensLogRouter(`${id}Firelens`, {
			image: Images.devxLogs,
			logging: LogDrivers.awsLogs({
				streamPrefix: [stack, stage, app].join('/'),
				logRetention: RetentionDays.ONE_DAY,
			}),
			environment: {
				STACK: stack,
				STAGE: stage,
				APP: app,
				GU_REPO: thisRepo,
			},
			firelensConfig: {
				type: FirelensLogRouterType.FLUENTBIT,
			},
			readonlyRootFilesystem: true,
		});

		const firelensVolume: Volume = {
			name: 'firelens-volume',
		};
		task.addVolume(firelensVolume);

		firelensLogRouter.addMountPoints({
			containerPath: '/init',
			sourceVolume: firelensVolume.name,
			readOnly: false,
		});

		managedPolicies.forEach((policy) => task.taskRole.addManagedPolicy(policy));
		policies.forEach((policy) => task.addToTaskRolePolicy(policy));
		task.taskRole.addManagedPolicy(xrayPolicy);

		db.grantConnect(task.taskRole);

		super(scope, id, {
			schedule,
			cluster,
			vpc: cluster.vpc,
			subnetSelection: { subnets: cluster.vpc.privateSubnets },
			scheduledFargateTaskDefinitionOptions: {
				taskDefinition: task,
			},
			securityGroups: [dbAccess, ...additionalSecurityGroups],
			enabled,
			propagateTags: PropagatedTagSource.TASK_DEFINITION,
		});

		this.sourceConfig = sourceConfig;
	}