private createHPCStateMachine()

in source/src/molecule-unfolding/cdk/construct-benchmark.ts [401:500]


    private createHPCStateMachine(): sfn.StateMachine {
        const parametersLambdaStep = new tasks.LambdaInvoke(this, 'Get Task Parameters', {
            lambdaFunction: this.taskParamLambda,
            payload: sfn.TaskInput.fromObject({
                "s3_bucket": this.props.bucket.bucketName,
                "s3_prefix": this.props.prefix,
                "param_type": "PARAMS_FOR_HPC",
                "execution_id": sfn.JsonPath.stringAt("$.execution_id"),
                "context.$": "$$"
            }),
            outputPath: '$.Payload'
        });
        const hpcJobQueue = this.batchUtil.getHpcJobQueue()
        const jobDef = this.batchUtil.createHPCBatchJobDef("HPCJob_Template", 2, 4);

        const stateJson = {
            End: true,
            Type: "Task",
            Resource: "arn:aws:states:::batch:submitJob.sync",
            Parameters: {
                JobDefinition: jobDef.jobDefinitionArn,
                "JobName.$": "States.Format('HPCTask{}-{}', $.ItemIndex, $.ItemValue.task_name)",
                JobQueue: hpcJobQueue.jobQueueArn,
                ContainerOverrides: {
                    "Command.$": "$.ItemValue.params",
                    ResourceRequirements: [{
                            Type: "VCPU",
                            "Value.$": "States.Format('{}',$.ItemValue.vcpus)"
                        },
                        {
                            Type: "MEMORY",
                            "Value.$": "States.Format('{}', $.ItemValue.memory)"
                        }
                    ]
                }
            },
            "ResultSelector": {
                "JobId.$": "$.JobId",
                "JobName.$": "$.JobName"
            }
        };

        const customBatchSubmitJob = new sfn.CustomState(this, 'Run HPC Batch Task', {
            stateJson
        });

        const parallelHPCJobsMap = new sfn.Map(this, 'ParallelHPCJobs', {
            maxConcurrency: 20,
            itemsPath: sfn.JsonPath.stringAt('$.hpcTaskParams'),
            parameters: {
                "ItemIndex.$": "$$.Map.Item.Index",
                "ItemValue.$": "$$.Map.Item.Value",
                "execution_id.$": "$.execution_id"
            },
            resultPath: "$.parallelHPCJobsMap"
        });
        parallelHPCJobsMap.iterator(customBatchSubmitJob);

        const chain = sfn.Chain.start(parametersLambdaStep).next(parallelHPCJobsMap);

        const logGroupName = `${this.props.stackName}-HPCStateMachineLogGroup`
        grantKmsKeyPerm(this.logKey, logGroupName)

        const logGroup = new logs.LogGroup(this, 'HPCStateMachineLogGroup', {
            encryptionKey: this.logKey,
            logGroupName,
            removalPolicy: cdk.RemovalPolicy.DESTROY,
            retention: logs.RetentionDays.THREE_MONTHS
        });

        const hpcStateMachine = new sfn.StateMachine(this, 'HPCStateMachine', {
            definition: chain,
            logs: {
                destination: logGroup,
                level: sfn.LogLevel.ALL,
            },
        });
        logGroup.grantWrite(hpcStateMachine)

        hpcStateMachine.role.addToPrincipalPolicy(new iam.PolicyStatement({
            resources: [
                `arn:aws:batch:${this.props.region}:${this.props.account}:job-definition/*`,
                `arn:aws:batch:${this.props.region}:${this.props.account}:job-queue/*`
            ],
            actions: [
                "batch:SubmitJob",
            ]
        }));
        hpcStateMachine.role.addToPrincipalPolicy(new iam.PolicyStatement({
            resources: [
                `arn:aws:events:${this.props.region}:${this.props.account}:rule/*`,
            ],
            actions: [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ]
        }));
        return hpcStateMachine
    }