def _step_cli()

in metaflow/plugins/airflow/airflow.py [0:0]


    def _step_cli(self, node, paths, code_package_url, user_code_retries):
        cmds = []

        script_name = os.path.basename(sys.argv[0])
        executable = self.environment.executable(node.name)

        entrypoint = [executable, script_name]

        top_opts_dict = {
            "with": [
                decorator.make_decorator_spec()
                for decorator in node.decorators
                if not decorator.statically_defined
            ]
        }
        # FlowDecorators can define their own top-level options. They are
        # responsible for adding their own top-level options and values through
        # the get_top_level_options() hook. See similar logic in runtime.py.
        for deco in flow_decorators(self.flow):
            top_opts_dict.update(deco.get_top_level_options())

        top_opts = list(dict_to_cli_options(top_opts_dict))

        top_level = top_opts + [
            "--quiet",
            "--metadata=%s" % self.metadata.TYPE,
            "--environment=%s" % self.environment.TYPE,
            "--datastore=%s" % self.flow_datastore.TYPE,
            "--datastore-root=%s" % self.flow_datastore.datastore_root,
            "--event-logger=%s" % self.event_logger.TYPE,
            "--monitor=%s" % self.monitor.TYPE,
            "--no-pylint",
            "--with=airflow_internal",
        ]

        if node.name == "start":
            # We need a separate unique ID for the special _parameters task
            task_id_params = "%s-params" % AIRFLOW_MACROS.create_task_id(
                self.contains_foreach
            )
            # Export user-defined parameters into runtime environment
            param_file = "".join(
                random.choice(string.ascii_lowercase) for _ in range(10)
            )
            # Setup Parameters as environment variables which are stored in a dictionary.
            export_params = (
                "python -m "
                "metaflow.plugins.airflow.plumbing.set_parameters %s "
                "&& . `pwd`/%s" % (param_file, param_file)
            )
            # Setting parameters over here.
            params = (
                entrypoint
                + top_level
                + [
                    "init",
                    "--run-id %s" % AIRFLOW_MACROS.RUN_ID,
                    "--task-id %s" % task_id_params,
                ]
            )

            # Assign tags to run objects.
            if self.tags:
                params.extend("--tag %s" % tag for tag in self.tags)

            # If the start step gets retried, we must be careful not to
            # regenerate multiple parameters tasks. Hence, we check first if
            # _parameters exists already.
            exists = entrypoint + [
                # Dump the parameters task
                "dump",
                "--max-value-size=0",
                "%s/_parameters/%s" % (AIRFLOW_MACROS.RUN_ID, task_id_params),
            ]
            cmd = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
                " ".join(exists),
                export_params,
                " ".join(params),
            )
            cmds.append(cmd)
            # set input paths for parameters
            paths = "%s/_parameters/%s" % (AIRFLOW_MACROS.RUN_ID, task_id_params)

        step = [
            "step",
            node.name,
            "--run-id %s" % AIRFLOW_MACROS.RUN_ID,
            "--task-id %s" % AIRFLOW_MACROS.create_task_id(self.contains_foreach),
            "--retry-count %s" % AIRFLOW_MACROS.ATTEMPT,
            "--max-user-code-retries %d" % user_code_retries,
            "--input-paths %s" % paths,
        ]
        if self.tags:
            step.extend("--tag %s" % tag for tag in self.tags)
        if self.namespace is not None:
            step.append("--namespace=%s" % self.namespace)

        parent_is_foreach = any(  # The immediate parent is a foreach node.
            self.graph[n].type == "foreach" for n in node.in_funcs
        )
        if parent_is_foreach:
            step.append("--split-index %s" % AIRFLOW_MACROS.FOREACH_SPLIT_INDEX)

        cmds.append(" ".join(entrypoint + top_level + step))
        return cmds