in src/backend/ingestion/index.ts [106:251]
public constructor(scope: Construct, id: string, props: IngestionProps) {
super(scope, id);
this.deadLetterQueue = new Queue(this, 'DLQ', {
encryption: QueueEncryption.KMS_MANAGED,
retentionPeriod: this.queueRetentionPeriod,
visibilityTimeout: Duration.minutes(15),
});
this.queue = new Queue(this, 'Queue', {
deadLetterQueue: {
maxReceiveCount: 5,
queue: this.deadLetterQueue,
},
encryption: QueueEncryption.KMS_MANAGED,
retentionPeriod: this.queueRetentionPeriod,
visibilityTimeout: Duration.minutes(15),
});
const configFilename = 'config.json';
const config = new TempFile(configFilename, JSON.stringify({
packageLinks: props.packageLinks ?? [],
packageTags: props.packageTags ?? [],
}));
const storageFactory = S3StorageFactory.getOrCreate(this);
const configBucket = storageFactory.newBucket(this, 'ConfigBucket', {
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
enforceSSL: true,
versioned: true,
});
new BucketDeployment(this, 'DeployIngestionConfiguration', {
sources: [Source.asset(config.dir)],
destinationBucket: configBucket,
});
const environment: FunctionProps['environment'] = {
AWS_EMF_ENVIRONMENT: 'Local',
BUCKET_NAME: props.bucket.bucketName,
CONFIG_BUCKET_NAME: configBucket.bucketName,
CONFIG_FILE_KEY: configFilename,
STATE_MACHINE_ARN: props.orchestration.stateMachine.stateMachineArn,
};
if (props.codeArtifact) {
environment.CODE_ARTIFACT_REPOSITORY_ENDPOINT = props.codeArtifact.publishingRepositoryNpmEndpoint;
environment.CODE_ARTIFACT_DOMAIN_NAME = props.codeArtifact.repositoryDomainName;
environment.CODE_ARTIFACT_DOMAIN_OWNER = props.codeArtifact.repositoryDomainOwner;
}
const handler = new Handler(this, 'Default', {
description: '[ConstructHub/Ingestion] Ingests new package versions into the Construct Hub',
environment,
logRetention: props.logRetention ?? RetentionDays.TEN_YEARS,
memorySize: 10_240, // Currently the maximum possible setting
timeout: Duration.minutes(15),
tracing: Tracing.ACTIVE,
});
this.function = handler;
configBucket.grantRead(handler);
props.bucket.grantWrite(this.function);
props.codeArtifact?.grantPublishToRepository(handler);
props.orchestration.stateMachine.grantStartExecution(this.function);
this.function.addEventSource(new SqsEventSource(this.queue, { batchSize: 1 }));
// This event source is disabled, and can be used to re-process dead-letter-queue messages
this.function.addEventSource(new SqsEventSource(this.deadLetterQueue, { batchSize: 1, enabled: false }));
// Reprocess workflow
const reprocessQueue = new Queue(this, 'ReprocessQueue', {
deadLetterQueue: {
maxReceiveCount: 5,
queue: this.deadLetterQueue,
},
encryption: QueueEncryption.KMS_MANAGED,
retentionPeriod: this.queueRetentionPeriod,
// Visibility timeout of 15 minutes matches the Lambda maximum execution time.
visibilityTimeout: Duration.minutes(15),
});
props.bucket.grantRead(this.function, `${STORAGE_KEY_PREFIX}*${PACKAGE_KEY_SUFFIX}`);
this.function.addEventSource(new SqsEventSource(reprocessQueue, { batchSize: 1 }));
const reprocessWorkflow = new ReprocessIngestionWorkflow(this, 'ReprocessWorkflow', { bucket: props.bucket, queue: reprocessQueue });
// Run reprocess workflow on a daily basis
const updatePeriod = props.reprocessFrequency;
if (updatePeriod) {
const rule = new Rule(this, 'ReprocessCronJob', {
schedule: Schedule.rate(updatePeriod),
description: 'Periodically reprocess all packages',
});
rule.addTarget(new SfnStateMachine(reprocessWorkflow.stateMachine, {
input: RuleTargetInput.fromObject({
comment: 'Scheduled reprocessing event from cron job.',
}),
}));
}
this.grantPrincipal = this.function.grantPrincipal;
props.monitoring.addLowSeverityAlarm(
'Ingestion Dead-Letter Queue not empty',
new MathExpression({
expression: 'm1 + m2',
usingMetrics: {
m1: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible({ period: Duration.minutes(1) }),
m2: this.deadLetterQueue.metricApproximateNumberOfMessagesNotVisible({ period: Duration.minutes(1) }),
},
}).createAlarm(this, 'DLQAlarm', {
alarmName: `${this.node.path}/DLQNotEmpty`,
alarmDescription: [
'The dead-letter queue for the Ingestion function is not empty!',
'',
`RunBook: ${RUNBOOK_URL}`,
'',
`Direct link to the queue: ${sqsQueueUrl(this.deadLetterQueue)}`,
`Direct link to the function: ${lambdaFunctionUrl(this.function)}`,
].join('\n'),
comparisonOperator: ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
evaluationPeriods: 1,
threshold: 1,
// SQS does not emit metrics if the queue has been empty for a while, which is GOOD.
treatMissingData: TreatMissingData.NOT_BREACHING,
}),
);
props.monitoring.addHighSeverityAlarm(
'Ingestion failures',
this.function.metricErrors().createAlarm(this, 'FailureAlarm', {
alarmName: `${this.node.path}/Failure`,
alarmDescription: [
'The Ingestion function is failing!',
'',
`RunBook: ${RUNBOOK_URL}`,
'',
`Direct link to the function: ${lambdaFunctionUrl(this.function)}`,
].join('\n'),
comparisonOperator: ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
evaluationPeriods: 2,
threshold: 1,
// Lambda only emits metrics when the function is invoked. No invokation => no errors.
treatMissingData: TreatMissingData.NOT_BREACHING,
}),
);
}