in source/src/molecule-unfolding/cdk/construct-benchmark.ts [401:500]
private createHPCStateMachine(): sfn.StateMachine {
const parametersLambdaStep = new tasks.LambdaInvoke(this, 'Get Task Parameters', {
lambdaFunction: this.taskParamLambda,
payload: sfn.TaskInput.fromObject({
"s3_bucket": this.props.bucket.bucketName,
"s3_prefix": this.props.prefix,
"param_type": "PARAMS_FOR_HPC",
"execution_id": sfn.JsonPath.stringAt("$.execution_id"),
"context.$": "$$"
}),
outputPath: '$.Payload'
});
const hpcJobQueue = this.batchUtil.getHpcJobQueue()
const jobDef = this.batchUtil.createHPCBatchJobDef("HPCJob_Template", 2, 4);
const stateJson = {
End: true,
Type: "Task",
Resource: "arn:aws:states:::batch:submitJob.sync",
Parameters: {
JobDefinition: jobDef.jobDefinitionArn,
"JobName.$": "States.Format('HPCTask{}-{}', $.ItemIndex, $.ItemValue.task_name)",
JobQueue: hpcJobQueue.jobQueueArn,
ContainerOverrides: {
"Command.$": "$.ItemValue.params",
ResourceRequirements: [{
Type: "VCPU",
"Value.$": "States.Format('{}',$.ItemValue.vcpus)"
},
{
Type: "MEMORY",
"Value.$": "States.Format('{}', $.ItemValue.memory)"
}
]
}
},
"ResultSelector": {
"JobId.$": "$.JobId",
"JobName.$": "$.JobName"
}
};
const customBatchSubmitJob = new sfn.CustomState(this, 'Run HPC Batch Task', {
stateJson
});
const parallelHPCJobsMap = new sfn.Map(this, 'ParallelHPCJobs', {
maxConcurrency: 20,
itemsPath: sfn.JsonPath.stringAt('$.hpcTaskParams'),
parameters: {
"ItemIndex.$": "$$.Map.Item.Index",
"ItemValue.$": "$$.Map.Item.Value",
"execution_id.$": "$.execution_id"
},
resultPath: "$.parallelHPCJobsMap"
});
parallelHPCJobsMap.iterator(customBatchSubmitJob);
const chain = sfn.Chain.start(parametersLambdaStep).next(parallelHPCJobsMap);
const logGroupName = `${this.props.stackName}-HPCStateMachineLogGroup`
grantKmsKeyPerm(this.logKey, logGroupName)
const logGroup = new logs.LogGroup(this, 'HPCStateMachineLogGroup', {
encryptionKey: this.logKey,
logGroupName,
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.THREE_MONTHS
});
const hpcStateMachine = new sfn.StateMachine(this, 'HPCStateMachine', {
definition: chain,
logs: {
destination: logGroup,
level: sfn.LogLevel.ALL,
},
});
logGroup.grantWrite(hpcStateMachine)
hpcStateMachine.role.addToPrincipalPolicy(new iam.PolicyStatement({
resources: [
`arn:aws:batch:${this.props.region}:${this.props.account}:job-definition/*`,
`arn:aws:batch:${this.props.region}:${this.props.account}:job-queue/*`
],
actions: [
"batch:SubmitJob",
]
}));
hpcStateMachine.role.addToPrincipalPolicy(new iam.PolicyStatement({
resources: [
`arn:aws:events:${this.props.region}:${this.props.account}:rule/*`,
],
actions: [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule"
]
}));
return hpcStateMachine
}