in src/backend/ingestion/index.ts [325:419]
public constructor(scope: Construct, id: string, props: ReprocessIngestionWorkflowProps) {
super(scope, id);
const lambdaFunction = new ReIngest(this, 'Function', {
architecture: gravitonLambdaIfAvailable(this),
description: '[ConstructHub/Ingestion/ReIngest] The function used to reprocess packages through ingestion',
environment: { BUCKET_NAME: props.bucket.bucketName, QUEUE_URL: props.queue.queueUrl },
memorySize: 10_240,
tracing: Tracing.ACTIVE,
timeout: Duration.minutes(3),
});
props.queue.grantSendMessages(lambdaFunction);
props.bucket.grantRead(lambdaFunction, `${STORAGE_KEY_PREFIX}*${METADATA_KEY_SUFFIX}`);
props.bucket.grantRead(lambdaFunction, `${STORAGE_KEY_PREFIX}*${PACKAGE_KEY_SUFFIX}`);
// Need to physical-name the state machine so it can self-invoke.
const stateMachineName = stateMachineNameFrom(this.node.path);
const listBucket = new Choice(this, 'Has a ContinuationToken?')
.when(Condition.isPresent('$.ContinuationToken'),
new CallAwsService(this, 'S3.ListObjectsV2(NextPage)', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
ContinuationToken: JsonPath.stringAt('$.ContinuationToken'),
Prefix: STORAGE_KEY_PREFIX,
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }))
.otherwise(new CallAwsService(this, 'S3.ListObjectsV2(FirstPage)', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
Prefix: STORAGE_KEY_PREFIX,
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] })).afterwards();
const process = new Map(this, 'Process Result', {
itemsPath: '$.response.Contents',
resultPath: JsonPath.DISCARD,
}).iterator(
new Choice(this, 'Is metadata object?')
.when(
Condition.stringMatches('$.Key', `*${METADATA_KEY_SUFFIX}`),
new LambdaInvoke(this, 'Send for reprocessing', { lambdaFunction })
// Ample retries here... We should never fail because of throttling....
.addRetry({ errors: ['Lambda.TooManyRequestsException'], backoffRate: 1.1, interval: Duration.minutes(1), maxAttempts: 30 }),
)
.otherwise(new Succeed(this, 'Nothing to do')),
);
listBucket.next(
new Choice(this, 'Is there more?')
.when(
Condition.isPresent('$.response.NextContinuationToken'),
new Wait(this, 'Give room for on-demand work', {
// Sleep a little before enqueuing the next batch, so that we leave room in the worker
// pool for handling on-demand work. If we don't do this, 60k items will be queued at
// once and live updates from NPM will struggle to get in in a reasonable time.
time: WaitTime.duration(waitTimeBetweenReprocessBatches()),
}).next(new StepFunctionsStartExecution(this, 'Continue as new', {
associateWithParent: true,
stateMachine: StateMachine.fromStateMachineArn(this, 'ThisStateMachine', Stack.of(this).formatArn({
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
service: 'states',
resource: 'stateMachine',
resourceName: stateMachineName,
})),
input: TaskInput.fromObject({ ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken') }),
integrationPattern: IntegrationPattern.REQUEST_RESPONSE,
resultPath: JsonPath.DISCARD,
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })),
).afterwards({ includeOtherwise: true })
.next(process),
);
this.stateMachine = new StateMachine(this, 'StateMachine', {
definition: listBucket,
stateMachineName,
timeout: Duration.hours(1),
});
props.bucket.grantRead(this.stateMachine);
props.queue.grantSendMessages(this.stateMachine);
}