in packages/constructs/L3/ai/gaia-l3-construct/lib/rag-engines/data-import/file-import-workflow.ts [27:157]
constructor(scope: Construct, id: string, props: FileImportWorkflowProps) {
super(scope, id);
const setProcessing = new tasks.DynamoUpdateItem(this, 'SetProcessing', {
table: props.ragDynamoDBTables.documentsTable,
key: {
workspace_id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.workspace_id')),
document_id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.document_id')),
},
updateExpression: 'set #status=:statusValue',
expressionAttributeNames: {
'#status': 'status',
},
expressionAttributeValues: {
':statusValue': tasks.DynamoAttributeValue.fromString('processing'),
},
resultPath: sfn.JsonPath.DISCARD,
});
const setProcessed = new tasks.DynamoUpdateItem(this, 'SetProcessed', {
table: props.ragDynamoDBTables.documentsTable,
key: {
workspace_id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.workspace_id')),
document_id: tasks.DynamoAttributeValue.fromString(sfn.JsonPath.stringAt('$.document_id')),
},
updateExpression: 'set #status=:statusValue',
expressionAttributeNames: {
'#status': 'status',
},
expressionAttributeValues: {
':statusValue': tasks.DynamoAttributeValue.fromString('processed'),
},
resultPath: sfn.JsonPath.DISCARD,
}).next(new sfn.Succeed(this, 'Success'));
const fileImportJob = new sfn.CustomState(this, 'FileImportJob', {
stateJson: {
Type: 'Task',
Resource: `arn:${cdk.Aws.PARTITION}:states:::batch:submitJob.sync`,
Parameters: {
JobDefinition: props.fileImportBatchJob.fileImportJob.jobDefinitionArn,
'JobName.$': "States.Format('FileImport-{}-{}', $.workspace_id, $.document_id)",
JobQueue: props.fileImportBatchJob.jobQueue.jobQueueArn,
ContainerOverrides: {
Environment: [
{
Name: 'WORKSPACE_ID',
'Value.$': '$.workspace_id',
},
{
Name: 'DOCUMENT_ID',
'Value.$': '$.document_id',
},
{
Name: 'INPUT_BUCKET_NAME',
'Value.$': '$.input_bucket_name',
},
{
Name: 'INPUT_OBJECT_KEY',
'Value.$': '$.input_object_key',
},
{
Name: 'PROCESSING_BUCKET_NAME',
'Value.$': '$.processing_bucket_name',
},
{
Name: 'PROCESSING_OBJECT_KEY',
'Value.$': '$.processing_object_key',
},
],
},
},
ResultPath: '$.job',
},
});
const logGroup: logs.LogGroup = new MdaaLogGroup(this, `file-import-log-group`, {
naming: props.naming,
createParams: false,
createOutputs: false,
logGroupName: 'file-import',
logGroupNamePathPrefix: `/aws/stepfunction/`,
encryptionKey: props.encryptionKey,
retention: logs.RetentionDays.INFINITE,
});
const workflow = setProcessing.next(fileImportJob).next(setProcessed);
const stateMachine = new sfn.StateMachine(this, 'FileImportStateMachine', {
definitionBody: sfn.DefinitionBody.fromChainable(workflow),
timeout: cdk.Duration.hours(12),
comment: 'File import workflow',
tracingEnabled: true,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
props.encryptionKey.grantEncryptDecrypt(stateMachine);
stateMachine.addToRolePolicy(
new iam.PolicyStatement({
actions: ['events:CreateRule', 'events:PutRule', 'events:PutTargets'],
resources: ['*'],
}),
);
stateMachine.addToRolePolicy(
new iam.PolicyStatement({
actions: ['batch:SubmitJob'],
resources: [
props.fileImportBatchJob.jobQueue.jobQueueArn,
props.fileImportBatchJob.fileImportJob.jobDefinitionArn,
],
}),
);
MdaaNagSuppressions.addCodeResourceSuppressions(
stateMachine,
[
{
id: 'AwsSolutions-IAM5',
reason: 'Events handled by upstream dynamodb service, resource unknown at deployment time',
},
{ id: 'NIST.800.53.R5-IAMNoInlinePolicy', reason: 'Inline policy managed by MDAA framework.' },
{ id: 'HIPAA.Security-IAMNoInlinePolicy', reason: 'Inline policy managed by MDAA framework.' },
{ id: 'PCI.DSS.321-IAMNoInlinePolicy', reason: 'Inline policy managed by MDAA framework.' },
],
true,
);
this.stateMachine = stateMachine;
}