public constructor()

in src/backend/ingestion/index.ts [106:251]


  public constructor(scope: Construct, id: string, props: IngestionProps) {
    super(scope, id);

    this.deadLetterQueue = new Queue(this, 'DLQ', {
      encryption: QueueEncryption.KMS_MANAGED,
      retentionPeriod: this.queueRetentionPeriod,
      visibilityTimeout: Duration.minutes(15),
    });

    this.queue = new Queue(this, 'Queue', {
      deadLetterQueue: {
        maxReceiveCount: 5,
        queue: this.deadLetterQueue,
      },
      encryption: QueueEncryption.KMS_MANAGED,
      retentionPeriod: this.queueRetentionPeriod,
      visibilityTimeout: Duration.minutes(15),
    });

    const configFilename = 'config.json';
    const config = new TempFile(configFilename, JSON.stringify({
      packageLinks: props.packageLinks ?? [],
      packageTags: props.packageTags ?? [],
    }));

    const storageFactory = S3StorageFactory.getOrCreate(this);
    const configBucket = storageFactory.newBucket(this, 'ConfigBucket', {
      blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
      enforceSSL: true,
      versioned: true,
    });

    new BucketDeployment(this, 'DeployIngestionConfiguration', {
      sources: [Source.asset(config.dir)],
      destinationBucket: configBucket,
    });

    const environment: FunctionProps['environment'] = {
      AWS_EMF_ENVIRONMENT: 'Local',
      BUCKET_NAME: props.bucket.bucketName,
      CONFIG_BUCKET_NAME: configBucket.bucketName,
      CONFIG_FILE_KEY: configFilename,
      STATE_MACHINE_ARN: props.orchestration.stateMachine.stateMachineArn,
    };

    if (props.codeArtifact) {
      environment.CODE_ARTIFACT_REPOSITORY_ENDPOINT = props.codeArtifact.publishingRepositoryNpmEndpoint;
      environment.CODE_ARTIFACT_DOMAIN_NAME = props.codeArtifact.repositoryDomainName;
      environment.CODE_ARTIFACT_DOMAIN_OWNER = props.codeArtifact.repositoryDomainOwner;
    }

    const handler = new Handler(this, 'Default', {
      description: '[ConstructHub/Ingestion] Ingests new package versions into the Construct Hub',
      environment,
      logRetention: props.logRetention ?? RetentionDays.TEN_YEARS,
      memorySize: 10_240, // Currently the maximum possible setting
      timeout: Duration.minutes(15),
      tracing: Tracing.ACTIVE,
    });
    this.function = handler;

    configBucket.grantRead(handler);
    props.bucket.grantWrite(this.function);
    props.codeArtifact?.grantPublishToRepository(handler);
    props.orchestration.stateMachine.grantStartExecution(this.function);

    this.function.addEventSource(new SqsEventSource(this.queue, { batchSize: 1 }));
    // This event source is disabled, and can be used to re-process dead-letter-queue messages
    this.function.addEventSource(new SqsEventSource(this.deadLetterQueue, { batchSize: 1, enabled: false }));


    // Reprocess workflow
    const reprocessQueue = new Queue(this, 'ReprocessQueue', {
      deadLetterQueue: {
        maxReceiveCount: 5,
        queue: this.deadLetterQueue,
      },
      encryption: QueueEncryption.KMS_MANAGED,
      retentionPeriod: this.queueRetentionPeriod,
      // Visibility timeout of 15 minutes matches the Lambda maximum execution time.
      visibilityTimeout: Duration.minutes(15),
    });
    props.bucket.grantRead(this.function, `${STORAGE_KEY_PREFIX}*${PACKAGE_KEY_SUFFIX}`);
    this.function.addEventSource(new SqsEventSource(reprocessQueue, { batchSize: 1 }));
    const reprocessWorkflow = new ReprocessIngestionWorkflow(this, 'ReprocessWorkflow', { bucket: props.bucket, queue: reprocessQueue });

    // Run reprocess workflow on a daily basis
    const updatePeriod = props.reprocessFrequency;
    if (updatePeriod) {
      const rule = new Rule(this, 'ReprocessCronJob', {
        schedule: Schedule.rate(updatePeriod),
        description: 'Periodically reprocess all packages',
      });
      rule.addTarget(new SfnStateMachine(reprocessWorkflow.stateMachine, {
        input: RuleTargetInput.fromObject({
          comment: 'Scheduled reprocessing event from cron job.',
        }),
      }));
    }

    this.grantPrincipal = this.function.grantPrincipal;

    props.monitoring.addLowSeverityAlarm(
      'Ingestion Dead-Letter Queue not empty',
      new MathExpression({
        expression: 'm1 + m2',
        usingMetrics: {
          m1: this.deadLetterQueue.metricApproximateNumberOfMessagesVisible({ period: Duration.minutes(1) }),
          m2: this.deadLetterQueue.metricApproximateNumberOfMessagesNotVisible({ period: Duration.minutes(1) }),
        },
      }).createAlarm(this, 'DLQAlarm', {
        alarmName: `${this.node.path}/DLQNotEmpty`,
        alarmDescription: [
          'The dead-letter queue for the Ingestion function is not empty!',
          '',
          `RunBook: ${RUNBOOK_URL}`,
          '',
          `Direct link to the queue: ${sqsQueueUrl(this.deadLetterQueue)}`,
          `Direct link to the function: ${lambdaFunctionUrl(this.function)}`,
        ].join('\n'),
        comparisonOperator: ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
        evaluationPeriods: 1,
        threshold: 1,
        // SQS does not emit metrics if the queue has been empty for a while, which is GOOD.
        treatMissingData: TreatMissingData.NOT_BREACHING,
      }),
    );
    props.monitoring.addHighSeverityAlarm(
      'Ingestion failures',
      this.function.metricErrors().createAlarm(this, 'FailureAlarm', {
        alarmName: `${this.node.path}/Failure`,
        alarmDescription: [
          'The Ingestion function is failing!',
          '',
          `RunBook: ${RUNBOOK_URL}`,
          '',
          `Direct link to the function: ${lambdaFunctionUrl(this.function)}`,
        ].join('\n'),
        comparisonOperator: ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
        evaluationPeriods: 2,
        threshold: 1,
        // Lambda only emits metrics when the function is invoked. No invokation => no errors.
        treatMissingData: TreatMissingData.NOT_BREACHING,
      }),
    );
  }