public constructor()

in src/backend/ingestion/index.ts [325:419]


  public constructor(scope: Construct, id: string, props: ReprocessIngestionWorkflowProps) {
    super(scope, id);

    const lambdaFunction = new ReIngest(this, 'Function', {
      architecture: gravitonLambdaIfAvailable(this),
      description: '[ConstructHub/Ingestion/ReIngest] The function used to reprocess packages through ingestion',
      environment: { BUCKET_NAME: props.bucket.bucketName, QUEUE_URL: props.queue.queueUrl },
      memorySize: 10_240,
      tracing: Tracing.ACTIVE,
      timeout: Duration.minutes(3),
    });

    props.queue.grantSendMessages(lambdaFunction);
    props.bucket.grantRead(lambdaFunction, `${STORAGE_KEY_PREFIX}*${METADATA_KEY_SUFFIX}`);
    props.bucket.grantRead(lambdaFunction, `${STORAGE_KEY_PREFIX}*${PACKAGE_KEY_SUFFIX}`);

    // Need to physical-name the state machine so it can self-invoke.
    const stateMachineName = stateMachineNameFrom(this.node.path);

    const listBucket = new Choice(this, 'Has a ContinuationToken?')
      .when(Condition.isPresent('$.ContinuationToken'),
        new CallAwsService(this, 'S3.ListObjectsV2(NextPage)', {
          service: 's3',
          action: 'listObjectsV2',
          iamAction: 's3:ListBucket',
          iamResources: [props.bucket.bucketArn],
          parameters: {
            Bucket: props.bucket.bucketName,
            ContinuationToken: JsonPath.stringAt('$.ContinuationToken'),
            Prefix: STORAGE_KEY_PREFIX,
          },
          resultPath: '$.response',
        }).addRetry({ errors: ['S3.SdkClientException'] }))
      .otherwise(new CallAwsService(this, 'S3.ListObjectsV2(FirstPage)', {
        service: 's3',
        action: 'listObjectsV2',
        iamAction: 's3:ListBucket',
        iamResources: [props.bucket.bucketArn],
        parameters: {
          Bucket: props.bucket.bucketName,
          Prefix: STORAGE_KEY_PREFIX,
        },
        resultPath: '$.response',
      }).addRetry({ errors: ['S3.SdkClientException'] })).afterwards();

    const process = new Map(this, 'Process Result', {
      itemsPath: '$.response.Contents',
      resultPath: JsonPath.DISCARD,
    }).iterator(
      new Choice(this, 'Is metadata object?')
        .when(
          Condition.stringMatches('$.Key', `*${METADATA_KEY_SUFFIX}`),
          new LambdaInvoke(this, 'Send for reprocessing', { lambdaFunction })
            // Ample retries here... We should never fail because of throttling....
            .addRetry({ errors: ['Lambda.TooManyRequestsException'], backoffRate: 1.1, interval: Duration.minutes(1), maxAttempts: 30 }),
        )
        .otherwise(new Succeed(this, 'Nothing to do')),
    );


    listBucket.next(
      new Choice(this, 'Is there more?')
        .when(
          Condition.isPresent('$.response.NextContinuationToken'),

          new Wait(this, 'Give room for on-demand work', {
            // Sleep a little before enqueuing the next batch, so that we leave room in the worker
            // pool for handling on-demand work. If we don't do this, 60k items will be queued at
            // once and live updates from NPM will struggle to get in in a reasonable time.
            time: WaitTime.duration(waitTimeBetweenReprocessBatches()),
          }).next(new StepFunctionsStartExecution(this, 'Continue as new', {
            associateWithParent: true,
            stateMachine: StateMachine.fromStateMachineArn(this, 'ThisStateMachine', Stack.of(this).formatArn({
              arnFormat: ArnFormat.COLON_RESOURCE_NAME,
              service: 'states',
              resource: 'stateMachine',
              resourceName: stateMachineName,
            })),
            input: TaskInput.fromObject({ ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken') }),
            integrationPattern: IntegrationPattern.REQUEST_RESPONSE,
            resultPath: JsonPath.DISCARD,
          }).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })),
        ).afterwards({ includeOtherwise: true })
        .next(process),
    );

    this.stateMachine = new StateMachine(this, 'StateMachine', {
      definition: listBucket,
      stateMachineName,
      timeout: Duration.hours(1),
    });

    props.bucket.grantRead(this.stateMachine);
    props.queue.grantSendMessages(this.stateMachine);
  }