def _step_cli()

in metaflow/plugins/aws/step_functions/step_functions.py [0:0]


    def _step_cli(self, node, paths, code_package_url, user_code_retries):
        cmds = []

        script_name = os.path.basename(sys.argv[0])
        executable = self.environment.executable(node.name)

        if R.use_r():
            entrypoint = [R.entrypoint()]
        else:
            entrypoint = [executable, script_name]

        # Use AWS Batch job identifier as the globally unique task identifier.
        task_id = "${AWS_BATCH_JOB_ID}"
        top_opts_dict = {
            "with": [
                decorator.make_decorator_spec()
                for decorator in node.decorators
                if not decorator.statically_defined
            ]
        }
        # FlowDecorators can define their own top-level options. They are
        # responsible for adding their own top-level options and values through
        # the get_top_level_options() hook. See similar logic in runtime.py.
        for deco in flow_decorators(self.flow):
            top_opts_dict.update(deco.get_top_level_options())

        top_opts = list(dict_to_cli_options(top_opts_dict))

        top_level = top_opts + [
            "--quiet",
            "--metadata=%s" % self.metadata.TYPE,
            "--environment=%s" % self.environment.TYPE,
            "--datastore=%s" % self.flow_datastore.TYPE,
            "--datastore-root=%s" % self.flow_datastore.datastore_root,
            "--event-logger=%s" % self.event_logger.TYPE,
            "--monitor=%s" % self.monitor.TYPE,
            "--no-pylint",
            "--with=step_functions_internal",
        ]

        if node.name == "start":
            # We need a separate unique ID for the special _parameters task
            task_id_params = "%s-params" % task_id
            # Export user-defined parameters into runtime environment
            param_file = "".join(
                random.choice(string.ascii_lowercase) for _ in range(10)
            )
            export_params = (
                "python -m "
                "metaflow.plugins.aws.step_functions.set_batch_environment "
                "parameters %s && . `pwd`/%s" % (param_file, param_file)
            )
            params = (
                entrypoint
                + top_level
                + [
                    "init",
                    "--run-id sfn-$METAFLOW_RUN_ID",
                    "--task-id %s" % task_id_params,
                ]
            )
            # Assign tags to run objects.
            if self.tags:
                params.extend("--tag %s" % tag for tag in self.tags)

            # If the start step gets retried, we must be careful not to
            # regenerate multiple parameters tasks. Hence, we check first if
            # _parameters exists already.
            exists = entrypoint + [
                "dump",
                "--max-value-size=0",
                "sfn-${METAFLOW_RUN_ID}/_parameters/%s" % (task_id_params),
            ]
            cmd = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
                " ".join(exists),
                export_params,
                " ".join(params),
            )
            cmds.append(cmd)
            paths = "sfn-${METAFLOW_RUN_ID}/_parameters/%s" % (task_id_params)

        if node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach":
            parent_tasks_file = "".join(
                random.choice(string.ascii_lowercase) for _ in range(10)
            )
            export_parent_tasks = (
                "python -m "
                "metaflow.plugins.aws.step_functions.set_batch_environment "
                "parent_tasks %s && . `pwd`/%s" % (parent_tasks_file, parent_tasks_file)
            )
            cmds.append(export_parent_tasks)

        step = [
            "step",
            node.name,
            "--run-id sfn-$METAFLOW_RUN_ID",
            "--task-id %s" % task_id,
            # Since retries are handled by AWS Batch, we can rely on
            # AWS_BATCH_JOB_ATTEMPT as the job counter.
            "--retry-count $((AWS_BATCH_JOB_ATTEMPT-1))",
            "--max-user-code-retries %d" % user_code_retries,
            "--input-paths %s" % paths,
        ]
        if any(self.graph[n].type == "foreach" for n in node.in_funcs):
            # We set the `METAFLOW_SPLIT_INDEX` through JSONPath-foo
            # to pass the state from the parent DynamoDb state for for-each.
            step.append("--split-index $METAFLOW_SPLIT_INDEX")
        if self.tags:
            step.extend("--tag %s" % tag for tag in self.tags)
        if self.namespace is not None:
            step.append("--namespace=%s" % self.namespace)
        cmds.append(" ".join(entrypoint + top_level + step))
        return " && ".join(cmds)