public ECSTaskSubmissionFromStepFunctionsPattern()

in amazon-ecs-java-starter-kit-cdk/src/main/java/software/aws/ecs/java/starterkit/cdk/ECSTaskSubmissionFromStepFunctionsPattern.java [81:313]


    public ECSTaskSubmissionFromStepFunctionsPattern(final Construct scope, final String id, final StackProps props) {
        super(scope, id, props);

        SubnetSelection privateSubnets = SubnetSelection.builder().subnetType(SubnetType.PRIVATE).build();
        // VPC
        Vpc vpc = Vpc.Builder.create(this, "StarterKitVPC").cidr("10.120.0.0/16").maxAzs(2)
                .gatewayEndpoints(new HashMap<String, GatewayVpcEndpointOptions>() {
                    private static final long serialVersionUID = 2479535941382804947L;
                    {
                        put("S3EndPoint", GatewayVpcEndpointOptions.builder().service(GatewayVpcEndpointAwsService.S3)
                                .subnets(new ArrayList<SubnetSelection>() {
                                    private static final long serialVersionUID = 1454955270027154519L;
                                    {
                                        add(privateSubnets);
                                    }
                                }).build());
                        put("DDBEndPoint",
                                GatewayVpcEndpointOptions.builder().service(GatewayVpcEndpointAwsService.DYNAMODB)
                                        .subnets(new ArrayList<SubnetSelection>() {
                                            private static final long serialVersionUID = -4389876763264722986L;
                                            {
                                                add(privateSubnets);
                                            }
                                        }).build());
                    }
                }).build();
        vpc.addInterfaceEndpoint("ECSEndPoint", InterfaceVpcEndpointOptions.builder()
                .service(InterfaceVpcEndpointAwsService.ECS).subnets(privateSubnets).build());
        vpc.addInterfaceEndpoint("ECSAgentEndPoint", InterfaceVpcEndpointOptions.builder()
                .service(InterfaceVpcEndpointAwsService.ECS_AGENT).subnets(privateSubnets).build());
        vpc.addInterfaceEndpoint("ECREndPoint", InterfaceVpcEndpointOptions.builder()
                .service(InterfaceVpcEndpointAwsService.ECR).subnets(privateSubnets).build());

        // ECS Tasks SecurityGroup
        SecurityGroup ecsSecurityGroup = SecurityGroup.Builder.create(this, "FargateSecurityGroup").vpc(vpc)
                .securityGroupName("amazon-ecs-java-starter-kit-pattern-2").allowAllOutbound(true).build();

        // S3 Bucket
        Bucket s3Bucket = Bucket.Builder.create(this, "S3Bucket")
                .bucketName(this.getAccount() + "-amazon-ecs-java-starter-kit-pattern-2-bucket")
                .blockPublicAccess(BlockPublicAccess.Builder.create()
                        .blockPublicAcls(true)
                        .blockPublicPolicy(true)
                        .ignorePublicAcls(true)
                        .restrictPublicBuckets(true)
                        .build())
                .removalPolicy(RemovalPolicy.DESTROY)
                .build();

        // DynamoDB Tables
        String workflowSummaryPartitionKeyName = "workflow_name";
        String workflowSummarySortKeyName = "workflow_run_id";
        Table workflow_summary = Table.Builder.create(this, "DDBWorkFlowSummary").tableName("workflow_summary_pattern_2")
                .removalPolicy(RemovalPolicy.DESTROY)
                .partitionKey(
                        Attribute.builder().name(workflowSummaryPartitionKeyName).type(AttributeType.STRING).build())
                .sortKey(Attribute.builder().name(workflowSummarySortKeyName).type(AttributeType.NUMBER).build())
                .build();

        String workflowDetailsPartitionKeyName = "workflow_run_id";
        String workflowDetailsSortKeyName = "ecs_task_id";
        Table workflow_details = Table.Builder.create(this, "DDBWorkFlowDetails").tableName("workflow_details_pattern_2")
                .removalPolicy(RemovalPolicy.DESTROY)
                .partitionKey(
                        Attribute.builder().name(workflowDetailsPartitionKeyName).type(AttributeType.NUMBER).build())
                .sortKey(Attribute.builder().name(workflowDetailsSortKeyName).type(AttributeType.STRING).build())
                .build();

        // ECS Cluster
        Cluster cluster = Cluster.Builder.create(this, "StarterKitCluster").clusterName("amazon-ecs-java-starter-kit-pattern-2")
                .vpc(vpc).build();

        // ECR Image
        String ecrRepoName = "amazon-ecs-java-starter-kit-pattern-2";
        @SuppressWarnings("deprecation")
        DockerImageAsset dockerImageAsset = DockerImageAsset.Builder.create(this, "StarterKitECRImage")
                .directory("../amazon-ecs-java-starter-kit-task").repositoryName(ecrRepoName).build();

        // Fargate Task Definition
        FargateTaskDefinition fargateTaskDefinition = FargateTaskDefinition.Builder
                .create(this, "StarterKitFargateTaskDefinition").family("amazon-ecs-java-starter-kit-pattern-2").cpu(1024)
                .memoryLimitMiB(2048).build();

        // Container Definition
        ContainerDefinition containerDefinition = fargateTaskDefinition.addContainer("amazon-ecs-java-starter-kit",
                ContainerDefinitionOptions.builder().essential(true)
                        .image(ContainerImage.fromDockerImageAsset(dockerImageAsset))
                        .logging(AwsLogDriver.awsLogs(AwsLogDriverProps.builder()
                                .logGroup(LogGroup.Builder.create(this, "ECSLogGroup")
                                        .logGroupName("/ecs/amazon-ecs-java-starter-kit-pattern-2")
                                        .removalPolicy(RemovalPolicy.DESTROY).retention(RetentionDays.ONE_DAY).build())
                                .streamPrefix("amazon-ecs-java-starter-kit").build()))
                        .build());

        // Container IAM permissions
        workflow_details.grantReadWriteData(fargateTaskDefinition.getTaskRole());
        s3Bucket.grantReadWrite(fargateTaskDefinition.getTaskRole());

        // TaskMonitor Lambda
        Function taskMonitor = Function.Builder.create(this, "TaskMonitorLambda")
                .functionName("amazon-ecs-java-starter-kit-pattern-2-ecs-task-monitor")
                .code(Code.fromAsset(
                        "../amazon-ecs-java-starter-kit-taskmonitor/target/amazon-ecs-java-starter-kit-taskmonitor-1.0.jar"))
                .handler("software.aws.ecs.java.starterkit.monitor.ECSTaskMonitor").runtime(Runtime.JAVA_8_CORRETTO)
                .timeout(Duration.minutes(5)).memorySize(256).logRetention(RetentionDays.ONE_DAY).vpc(vpc)
                .vpcSubnets(privateSubnets)
                .securityGroups(Collections.singletonList(SecurityGroup.Builder.create(this, "TaskMonitorSG").vpc(vpc)
                        .securityGroupName("amazon-ecs-java-starter-kit-pattern-2-ecs-task-Monitor").allowAllOutbound(true)
                        .build()))
                .environment(new HashMap<String, String>() {
                    private static final long serialVersionUID = -4232375236129537678L;
                    {
                        put("region", getRegion());
                        put("workflow_summary_ddb_table_name", workflow_summary.getTableName());
                        put("workflow_summary_hash_key", workflowSummaryPartitionKeyName);
                        put("workflow_summary_range_key", workflowSummarySortKeyName);
                        put("workflow_details_ddb_table_name", workflow_details.getTableName());
                        put("workflow_details_hash_key", workflowDetailsPartitionKeyName);
                        put("workflow_details_range_key", workflowDetailsSortKeyName);
                    }
                }).build();

        // IAM permissions for Lambdas
        workflow_details.grantReadWriteData(taskMonitor.getRole());
        workflow_summary.grantReadWriteData(taskMonitor.getRole());

        // Environment variables being passed to ECS Tasks
        ArrayList<TaskEnvironmentVariable> containerEnvVars = new ArrayList<TaskEnvironmentVariable>() {
            private static final long serialVersionUID = -7266629031441090205L;
            {
                add(EnvVarBuilder("region", Aws.REGION));
                add(EnvVarBuilder("workflow_details_ddb_table_name", workflow_details.getTableName()));
                add(EnvVarBuilder("workflow_details_hash_key", workflowDetailsPartitionKeyName));
                add(EnvVarBuilder("workflow_details_range_key", workflowDetailsSortKeyName));
                add(EnvVarBuilder("workflow_name", JsonPath.stringAt("$.workflowName")));
                add(EnvVarBuilder("workflow_run_id", JsonPath.stringAt("$.workflowRunId")));
                add(EnvVarBuilder("task_name", JsonPath.stringAt("$.taskName")));
                add(EnvVarBuilder("s3_bucket_name", JsonPath.stringAt("$.s3BucketName")));
                add(EnvVarBuilder("object_key", JsonPath.stringAt("$.objectKey")));
            }
        };

        // ECS Run Task State
        EcsRunTask ecsRunTask = EcsRunTask.Builder.create(this, "SubmitECSTasks").assignPublicIp(false).cluster(cluster)
                .taskDefinition(fargateTaskDefinition).subnets(privateSubnets)
                .securityGroups(Collections.singletonList(ecsSecurityGroup))
                .launchTarget(EcsFargateLaunchTarget.Builder.create().platformVersion(FargatePlatformVersion.VERSION1_4)
                        .build())
                .containerOverrides(Collections.singletonList(ContainerOverride.builder()
                        .containerDefinition(containerDefinition).environment(containerEnvVars).build()))
                .build();

        // Submit ECS tasks simultaneously using Map state
        Map ecsTasksSubmitter = Map.Builder.create(this, "S3CopyTaskRunner").parameters(new HashMap<String, String>() {
            private static final long serialVersionUID = -2748074145208985587L;
            {
                put("workflowRunId.$", "$.workflowRunId");
                put("workflowName.$", "$.workflowName");
                put("s3BucketName.$", "$.s3BucketName");
                put("taskName.$", "$$.Map.Item.Value.taskName");
                put("objectKey.$", "$$.Map.Item.Value.objectKey");
            }
        }).itemsPath("$.taskList").outputPath("$.[*].Tasks.[*].TaskArn")
                /**
                 * TODO: This line is not supported yet. See https://github.com/aws/aws-cdk/issues/9904
                 * .resultSelection("$.[*].Tasks.[*].TaskArn")
                 */
                .maxConcurrency(5).build().iterator(ecsRunTask);
        /**
         * TODO: Parallel Wrapper to work-around this issue with CDK
         * See https://github.com/aws/aws-cdk/issues/9904, So that we can pass through the input variables
         */
        Parallel parallelWrapper = Parallel.Builder.create(this, "KickOffInParallel").resultPath("$.paralleloutput")
                .build().branch(ecsTasksSubmitter);

        // Pass-Through State for passing through the ECS Task Arns
        Pass passThroughECSTasksArns = Pass.Builder.create(this, "PassThroughECSArns")
                .parameters(new HashMap<String, Object>() {
                    private static final long serialVersionUID = 4528431724317351553L;
                    {
                        put("iterator", new HashMap<String, Object>() {
                            private static final long serialVersionUID = 5348720279215832325L;
                            {
                                put("continue", false);
                                put("workflowRunId.$", "$.workflowRunId");
                                put("workflowName.$", "$.workflowName");
                                put("ecsTaskArns.$", "$.paralleloutput.[0]");
                            }
                        });
                    }
                }).build();

        // Monitor State Lambda invocation
        LambdaInvoke invokeMonitorState = LambdaInvoke.Builder.create(this, "InvokeTaskMonitor")
                .lambdaFunction(taskMonitor).resultPath("$.iterator").payloadResponseOnly(true).build();

        // State for sleeping for sometime
        Chain waitState = Wait.Builder.create(this, "WaitForECS").time(WaitTime.duration(Duration.seconds(120))).build()
                .next(invokeMonitorState);

        // Success State
        Succeed doneState = Succeed.Builder.create(this, "Done").build();

        // State for checking if tasks completed
        Choice checkTasksCompleted = Choice.Builder.create(this, "CheckIfTasksCompleted").build()
                .when(Condition.booleanEquals("$.iterator.continue", true), waitState).otherwise(doneState);

        // StateMachine
        StateMachine.Builder.create(this, "amazon-ecs-java-starter-kit")
                .stateMachineName("amazon-ecs-java-starter-kit-pattern-2").stateMachineType(StateMachineType.STANDARD)
                .definition(Chain.start(parallelWrapper).next(passThroughECSTasksArns).next(invokeMonitorState)
                        .next(checkTasksCompleted))
                .build();

        // Outputs
        CfnOutput.Builder.create(this, "region").value(this.getRegion()).build();
        CfnOutput.Builder.create(this, "clusterName").value(cluster.getClusterName()).build();
        CfnOutput.Builder.create(this, "containerName").value(containerDefinition.getContainerName()).build();
        CfnOutput.Builder.create(this, "taskDefinition")
                .value(Fn.select(1, Fn.split("/", fargateTaskDefinition.getTaskDefinitionArn()))).build();
        CfnOutput.Builder.create(this, "securityGroupId").value(vpc.getVpcDefaultSecurityGroup()).build();
        CfnOutput.Builder.create(this, "subnetIdLiteral").value(vpc.getPrivateSubnets().get(0).getSubnetId()).build();
        CfnOutput.Builder.create(this, "ddbTableNameWFSummary").value(workflow_summary.getTableName()).build();
        CfnOutput.Builder.create(this, "hashKeyWFSummary").value(workflowSummaryPartitionKeyName).build();
        CfnOutput.Builder.create(this, "rangeKeyWFSummary").value(workflowSummarySortKeyName).build();
        CfnOutput.Builder.create(this, "ddbTableNameWFDetails").value(workflow_details.getTableName()).build();
        CfnOutput.Builder.create(this, "hashKeyWFDetails").value(workflowDetailsPartitionKeyName).build();
        CfnOutput.Builder.create(this, "rangeKeyWFDetails").value(workflowDetailsSortKeyName).build();
        CfnOutput.Builder.create(this, "s3BucketName").value(s3Bucket.getBucketName()).build();
        CfnOutput.Builder.create(this, "separator").value("$").build();
        CfnOutput.Builder.create(this, "workflowName").value("amazon_ecs_starter_kit_pattern_2").build();
        CfnOutput.Builder.create(this, "workflowRunId").value("100001").build();
    }