in src/backend/orchestration/index.ts [465:617]
public constructor(scope: Construct, id: string, props: RegenerateAllDocumentationProps) {
super(scope, id);
const processVersions = new Choice(this, 'Get package versions page')
.when(Condition.isPresent('$.response.NextContinuationToken'), new tasks.CallAwsService(this, 'Next versions page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken'),
Delimiter: '/',
Prefix: JsonPath.stringAt('$.Prefix'),
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }))
.otherwise(new tasks.CallAwsService(this, 'First versions page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
Delimiter: '/',
Prefix: JsonPath.stringAt('$.Prefix'),
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }))
.afterwards()
.next(new Map(this, 'For each key prefix', { itemsPath: '$.response.CommonPrefixes', resultPath: JsonPath.DISCARD })
.iterator(new tasks.StepFunctionsStartExecution(this, 'Start Orchestration Workflow', {
stateMachine: props.stateMachine,
associateWithParent: true,
input: TaskInput.fromObject({
bucket: props.bucket.bucketName,
assembly: { key: JsonPath.stringAt(`States.Format('{}${ASSEMBLY_KEY_SUFFIX.substr(1)}', $.Prefix)`) },
metadata: { key: JsonPath.stringAt(`States.Format('{}${METADATA_KEY_SUFFIX.substr(1)}', $.Prefix)`) },
package: { key: JsonPath.stringAt(`States.Format('{}${PACKAGE_KEY_SUFFIX.substr(1)}', $.Prefix)`) },
}),
integrationPattern: IntegrationPattern.REQUEST_RESPONSE,
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })));
processVersions.next(new Choice(this, 'Has more versions?')
.when(Condition.isPresent('$.response.NextContinuationToken'), processVersions)
.otherwise(new Succeed(this, 'Success')));
const processPackageVersions = new StateMachine(this, 'PerPackage', {
definition: processVersions,
timeout: Duration.hours(1),
tracingEnabled: true,
});
// This workflow is broken into two sub-workflows because otherwise it hits the 25K events limit
// of StepFunction executions relatively quickly.
const processNamespace = new Choice(this, 'Get @scope page')
.when(Condition.isPresent('$.response.NextContinuationToken'), new tasks.CallAwsService(this, 'Next @scope page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken'),
Delimiter: '/',
Prefix: JsonPath.stringAt('$.Prefix'),
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }))
.otherwise(new tasks.CallAwsService(this, 'First @scope page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
Delimiter: '/',
Prefix: JsonPath.stringAt('$.Prefix'),
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }))
.afterwards()
.next(new Map(this, 'For each @scope/pkg', { itemsPath: '$.response.CommonPrefixes', resultPath: JsonPath.DISCARD })
.iterator(new tasks.StepFunctionsStartExecution(this, 'Process scoped package', {
stateMachine: processPackageVersions,
associateWithParent: true,
input: TaskInput.fromObject({
Prefix: JsonPath.stringAt('$.Prefix'),
}),
integrationPattern: IntegrationPattern.RUN_JOB,
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] })));
processNamespace.next(new Choice(this, 'Has more packages?')
.when(Condition.isPresent('$.response.NextContinuationToken'), processNamespace)
.otherwise(new Succeed(this, 'All Done')));
const start = new Choice(this, 'Get prefix page')
.when(
Condition.isPresent('$.response.NextContinuationToken'),
new tasks.CallAwsService(this, 'Next prefixes page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
ContinuationToken: JsonPath.stringAt('$.response.NextContinuationToken'),
Delimiter: '/',
Prefix: STORAGE_KEY_PREFIX,
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }),
)
.otherwise(
new tasks.CallAwsService(this, 'First prefix page', {
service: 's3',
action: 'listObjectsV2',
iamAction: 's3:ListBucket',
iamResources: [props.bucket.bucketArn],
parameters: {
Bucket: props.bucket.bucketName,
Delimiter: '/',
Prefix: STORAGE_KEY_PREFIX,
},
resultPath: '$.response',
}).addRetry({ errors: ['S3.SdkClientException'] }),
).afterwards()
.next(new Map(this, 'For each prefix', { itemsPath: '$.response.CommonPrefixes', resultPath: JsonPath.DISCARD })
.iterator(
new Choice(this, 'Is this a @scope/ prefix?')
.when(Condition.stringMatches('$.Prefix', `${STORAGE_KEY_PREFIX}@*`), processNamespace)
.otherwise(new tasks.StepFunctionsStartExecution(this, 'Process unscoped package', {
stateMachine: processPackageVersions,
associateWithParent: true,
input: TaskInput.fromObject({
Prefix: JsonPath.stringAt('$.Prefix'),
}),
integrationPattern: IntegrationPattern.RUN_JOB,
}).addRetry({ errors: ['StepFunctions.ExecutionLimitExceeded'] }))
.afterwards(),
));
start.next(new Choice(this, 'Has more prefixes?')
.when(Condition.isPresent('$.response.NextContinuationToken'), start)
.otherwise(new Succeed(this, 'Done')));
this.stateMachine = new StateMachine(this, 'Resource', {
definition: start,
stateMachineName: stateMachineNameFrom(this.node.path),
timeout: Duration.hours(4),
tracingEnabled: true,
});
props.bucket.grantRead(processPackageVersions);
props.bucket.grantRead(this.stateMachine);
}