in packages/cdk/lib/cloudquery/task.ts [132:431]
constructor(scope: GuStack, id: string, props: ScheduledCloudqueryTaskProps) {
const {
name,
db,
cluster,
app,
dbAccess,
schedule,
managedPolicies,
policies,
loggingStreamName,
sourceConfig,
enabled,
secrets,
additionalCommands = [],
memoryLimitMiB = 512,
cpu,
additionalSecurityGroups = [],
runAsSingleton,
cloudQueryApiKey,
dockerDistributedPluginImage,
writeMode,
} = props;
const { region, stack, stage } = scope;
const thisRepo = 'guardian/service-catalogue'; // TODO get this from GuStack
const frequency = scheduleFrequencyMs(schedule);
const roleName = `${app}-${stage}-task-${name}`;
const taskRole = new Role(scope, roleName, {
assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
roleName,
});
const xrayPolicy = ManagedPolicy.fromAwsManagedPolicyName(
'AWSXrayWriteOnlyAccess',
);
const task = new FargateTaskDefinition(scope, `${id}TaskDefinition`, {
memoryLimitMiB,
cpu,
taskRole,
});
/*
A scheduled task (i.e. `this`) cannot be tagged, so we tag the task definition instead.
*/
Tags.of(task).add('Name', name);
const destinationConfig = postgresDestinationConfig(writeMode);
/*
This error shouldn't ever be thrown as AWS CDK creates a secret by default,
it is just typed as optional.
See https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_rds.DatabaseInstance.html#credentials.
TODO: Remove this once IAM auth is working.
*/
if (!db.secret) {
throw new Error('DB Secret is missing');
}
const fireLensLogDriver = new FireLensLogDriver({
options: {
Name: `kinesis_streams`,
region,
stream: loggingStreamName,
retry_limit: '2',
},
});
const cloudqueryTask = task.addContainer(`${id}Container`, {
image: Images.cloudquery,
entryPoint: [''],
environment: {
GOMEMLIMIT: `${Math.floor(memoryLimitMiB * 0.8)}MiB`,
},
secrets: {
...secrets,
DB_USERNAME: Secret.fromSecretsManager(db.secret, 'username'),
DB_HOST: Secret.fromSecretsManager(db.secret, 'host'),
DB_PASSWORD: Secret.fromSecretsManager(db.secret, 'password'),
CLOUDQUERY_API_KEY: cloudQueryApiKey,
},
dockerLabels: {
Stack: stack,
Stage: stage,
App: app,
Name: name,
},
readonlyRootFilesystem: true,
command: [
'/bin/sh',
'-c',
[
...additionalCommands,
`printf '${dump(sourceConfig)}' > ${serviceCatalogueConfigDirectory}/source.yaml`,
`printf '${dump(destinationConfig)}' > ${serviceCatalogueConfigDirectory}/destination.yaml`,
`/app/cloudquery sync ${serviceCatalogueConfigDirectory}/source.yaml ${serviceCatalogueConfigDirectory}/destination.yaml --log-format json --log-console --no-log-file`,
].join(';'),
],
logging: fireLensLogDriver,
});
const configVolume: Volume = {
name: 'config-volume',
};
task.addVolume(configVolume);
const cqVolume: Volume = {
name: 'cloudquery-volume',
};
task.addVolume(cqVolume);
const tmpVolume: Volume = {
name: 'tmp-volume',
};
task.addVolume(tmpVolume);
cloudqueryTask.addMountPoints(
{
// So that we can write task config to this directory
containerPath: serviceCatalogueConfigDirectory,
sourceVolume: configVolume.name,
readOnly: false,
},
{
// So that Cloudquery can write to this directory
containerPath: '/app/.cq',
sourceVolume: cqVolume.name,
readOnly: false,
},
{
// So that Cloudquery can write temporary data
containerPath: '/tmp',
sourceVolume: tmpVolume.name,
readOnly: false,
},
);
const otel = task.addContainer(`${id}AWSOTELCollector`, {
image: Images.otelCollector,
command: ['--config=/etc/ecs/ecs-xray.yaml'],
logging: fireLensLogDriver,
healthCheck: {
command: ['CMD', '/healthcheck'],
interval: Duration.seconds(5),
},
portMappings: [
{
containerPort: 4318,
},
],
readonlyRootFilesystem: true,
});
cloudqueryTask.addContainerDependencies({
container: otel,
condition: ContainerDependencyCondition.HEALTHY,
});
if (dockerDistributedPluginImage) {
const additionalCloudQueryContainer = task.addContainer(
`${id}PluginContainer`,
{
image: dockerDistributedPluginImage,
logging: fireLensLogDriver,
essential: false,
readonlyRootFilesystem: true,
},
);
cloudqueryTask.addContainerDependencies({
container: additionalCloudQueryContainer,
condition: ContainerDependencyCondition.START,
});
}
if (runAsSingleton) {
const operationInProgress = 114;
const success = 0;
const singletonTask = task.addContainer(`${id}AwsCli`, {
image: Images.singletonImage,
entryPoint: [''],
command: [
'/bin/bash',
'-c',
[
// Who am I?
`ECS_CLUSTER=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Cluster')`,
`ECS_FAMILY=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.Family')`,
`ECS_TASK_ARN=$(curl -s $ECS_CONTAINER_METADATA_URI/task | jq -r '.TaskARN')`,
// How many more of me are there?
`RUNNING=$(aws ecs list-tasks --cluster $ECS_CLUSTER --family $ECS_FAMILY | jq '.taskArns | length')`,
// Exit zero (successful) if I'm the only one running
`[[ $\{RUNNING} > 1 ]] && exit ${operationInProgress} || exit ${success}`,
].join(';'),
],
readonlyRootFilesystem: true,
logging: fireLensLogDriver,
/*
A container listed as a dependency of another cannot be marked as essential.
Below, we describe a dependency such that CloudQuery will only start if the singleton step succeeds.
*/
essential: false,
});
cloudqueryTask.addContainerDependencies({
container: singletonTask,
condition: ContainerDependencyCondition.SUCCESS,
});
task.addToTaskRolePolicy(singletonPolicy(cluster));
}
const tableValues = sourceConfig.spec.tables
?.map((table) => table.replaceAll('*', '%'))
.map((table) => `('${table}', ${frequency})`)
.join(',');
task.addContainer(`${id}PostgresContainer`, {
image: Images.postgres,
entryPoint: [''],
secrets: {
PGUSER: Secret.fromSecretsManager(db.secret, 'username'),
PGHOST: Secret.fromSecretsManager(db.secret, 'host'),
PGPASSWORD: Secret.fromSecretsManager(db.secret, 'password'),
},
dockerLabels: {
Stack: stack,
Stage: stage,
App: app,
Name: name,
},
command: [
'/bin/sh',
'-c',
[
`psql -c "INSERT INTO cloudquery_table_frequency VALUES ${tableValues} ON CONFLICT (table_name) DO UPDATE SET frequency = ${frequency}"`,
].join(';'),
],
logging: fireLensLogDriver,
essential: false,
readonlyRootFilesystem: true,
});
const firelensLogRouter = task.addFirelensLogRouter(`${id}Firelens`, {
image: Images.devxLogs,
logging: LogDrivers.awsLogs({
streamPrefix: [stack, stage, app].join('/'),
logRetention: RetentionDays.ONE_DAY,
}),
environment: {
STACK: stack,
STAGE: stage,
APP: app,
GU_REPO: thisRepo,
},
firelensConfig: {
type: FirelensLogRouterType.FLUENTBIT,
},
readonlyRootFilesystem: true,
});
const firelensVolume: Volume = {
name: 'firelens-volume',
};
task.addVolume(firelensVolume);
firelensLogRouter.addMountPoints({
containerPath: '/init',
sourceVolume: firelensVolume.name,
readOnly: false,
});
managedPolicies.forEach((policy) => task.taskRole.addManagedPolicy(policy));
policies.forEach((policy) => task.addToTaskRolePolicy(policy));
task.taskRole.addManagedPolicy(xrayPolicy);
db.grantConnect(task.taskRole);
super(scope, id, {
schedule,
cluster,
vpc: cluster.vpc,
subnetSelection: { subnets: cluster.vpc.privateSubnets },
scheduledFargateTaskDefinitionOptions: {
taskDefinition: task,
},
securityGroups: [dbAccess, ...additionalSecurityGroups],
enabled,
propagateTags: PropagatedTagSource.TASK_DEFINITION,
});
this.sourceConfig = sourceConfig;
}