def _compile()

in metaflow/plugins/aws/step_functions/step_functions.py [0:0]


    def _compile(self):
        if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
            "trigger_on_finish"
        ):
            raise StepFunctionsException(
                "Deploying flows with @trigger or @trigger_on_finish decorator(s) "
                "to AWS Step Functions is not supported currently."
            )

        # Visit every node of the flow and recursively build the state machine.
        def _visit(node, workflow, exit_node=None):
            if node.parallel_foreach:
                raise StepFunctionsException(
                    "Deploying flows with @parallel decorator(s) "
                    "to AWS Step Functions is not supported currently."
                )

            # Assign an AWS Batch job to the AWS Step Functions state
            # and pass the intermediate state by exposing `JobId` and
            # `Parameters` to the child job(s) as outputs. `Index` and
            # `SplitParentTaskId` are populated optionally, when available.

            # We can't modify the names of keys in AWS Step Functions aside
            # from a blessed few which are set as `Parameters` for the Map
            # state. That's why even though `JobId` refers to the parent task
            # id, we can't call it as such. Similar situation for `Parameters`.
            state = (
                State(node.name)
                .batch(self._batch(node))
                .output_path(
                    "$.['JobId', " "'Parameters', " "'Index', " "'SplitParentTaskId']"
                )
            )
            # End the (sub)workflow if we have reached the end of the flow or
            # the parent step of matching_join of the sub workflow.
            if node.type == "end" or exit_node in node.out_funcs:
                workflow.add_state(state.end())
            # Continue linear assignment within the (sub)workflow if the node
            # doesn't branch or fork.
            elif node.type in ("start", "linear", "join"):
                workflow.add_state(state.next(node.out_funcs[0]))
                _visit(self.graph[node.out_funcs[0]], workflow, exit_node)
            # Create a `Parallel` state and assign sub workflows if the node
            # branches out.
            elif node.type == "split":
                branch_name = hashlib.sha224(
                    "&".join(node.out_funcs).encode("utf-8")
                ).hexdigest()
                workflow.add_state(state.next(branch_name))
                branch = Parallel(branch_name).next(node.matching_join)
                # Generate as many sub workflows as branches and recurse.
                for n in node.out_funcs:
                    branch.branch(
                        _visit(
                            self.graph[n], Workflow(n).start_at(n), node.matching_join
                        )
                    )
                workflow.add_state(branch)
                # Continue the traversal from the matching_join.
                _visit(self.graph[node.matching_join], workflow, exit_node)
            # Create a `Map` state and assign sub workflow if the node forks.
            elif node.type == "foreach":
                # Fetch runtime cardinality via an AWS DynamoDb Get call before
                # configuring the node
                cardinality_state_name = "#%s" % node.out_funcs[0]
                workflow.add_state(state.next(cardinality_state_name))
                cardinality_state = (
                    State(cardinality_state_name)
                    .dynamo_db(SFN_DYNAMO_DB_TABLE, "$.JobId", "for_each_cardinality")
                    .result_path("$.Result")
                )
                iterator_name = "*%s" % node.out_funcs[0]
                workflow.add_state(cardinality_state.next(iterator_name))
                workflow.add_state(
                    Map(iterator_name)
                    .items_path("$.Result.Item.for_each_cardinality.NS")
                    .parameter("JobId.$", "$.JobId")
                    .parameter("SplitParentTaskId.$", "$.JobId")
                    .parameter("Parameters.$", "$.Parameters")
                    .parameter("Index.$", "$$.Map.Item.Value")
                    .next(
                        "%s_*GetManifest" % iterator_name
                        if self.use_distributed_map
                        else node.matching_join
                    )
                    .iterator(
                        _visit(
                            self.graph[node.out_funcs[0]],
                            Workflow(node.out_funcs[0])
                            .start_at(node.out_funcs[0])
                            .mode(
                                "DISTRIBUTED" if self.use_distributed_map else "INLINE"
                            ),
                            node.matching_join,
                        )
                    )
                    .max_concurrency(self.max_workers)
                    # AWS Step Functions has a short coming for DistributedMap at the
                    # moment that does not allow us to subset the output of for-each
                    # to just a single element. We have to rely on a rather terrible
                    # hack and resort to using ResultWriter to write the state to
                    # Amazon S3 and process it in another task. But, well what can we
                    # do...
                    .result_writer(
                        *(
                            (
                                (
                                    SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH[len("s3://") :]
                                    if SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH.startswith(
                                        "s3://"
                                    )
                                    else SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH
                                ).split("/", 1)
                                + [""]
                            )[:2]
                            if self.use_distributed_map
                            else (None, None)
                        )
                    )
                    .output_path("$" if self.use_distributed_map else "$.[0]")
                )
                if self.use_distributed_map:
                    workflow.add_state(
                        State("%s_*GetManifest" % iterator_name)
                        .resource("arn:aws:states:::aws-sdk:s3:getObject")
                        .parameter("Bucket.$", "$.ResultWriterDetails.Bucket")
                        .parameter("Key.$", "$.ResultWriterDetails.Key")
                        .next("%s_*Map" % iterator_name)
                        .result_selector("Body.$", "States.StringToJson($.Body)")
                    )
                    workflow.add_state(
                        Map("%s_*Map" % iterator_name)
                        .iterator(
                            Workflow("%s_*PassWorkflow" % iterator_name)
                            .mode("DISTRIBUTED")
                            .start_at("%s_*Pass" % iterator_name)
                            .add_state(
                                Pass("%s_*Pass" % iterator_name)
                                .end()
                                .parameter("Output.$", "States.StringToJson($.Output)")
                                .output_path("$.Output")
                            )
                        )
                        .next(node.matching_join)
                        .max_concurrency(1000)
                        .item_reader(
                            JSONItemReader()
                            .resource("arn:aws:states:::s3:getObject")
                            .parameter("Bucket.$", "$.Body.DestinationBucket")
                            .parameter("Key.$", "$.Body.ResultFiles.SUCCEEDED[0].Key")
                        )
                        .output_path("$.[0]")
                    )

                # Continue the traversal from the matching_join.
                _visit(self.graph[node.matching_join], workflow, exit_node)
            # We shouldn't ideally ever get here.
            else:
                raise StepFunctionsException(
                    "Node type *%s* for  step *%s* "
                    "is not currently supported by "
                    "AWS Step Functions." % (node.type, node.name)
                )
            return workflow

        workflow = Workflow(self.name).start_at("start")
        if self.workflow_timeout:
            workflow.timeout_seconds(self.workflow_timeout)
        return _visit(self.graph["start"], workflow)