in metaflow/plugins/airflow/airflow.py [0:0]
def _step_cli(self, node, paths, code_package_url, user_code_retries):
cmds = []
script_name = os.path.basename(sys.argv[0])
executable = self.environment.executable(node.name)
entrypoint = [executable, script_name]
top_opts_dict = {
"with": [
decorator.make_decorator_spec()
for decorator in node.decorators
if not decorator.statically_defined
]
}
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())
top_opts = list(dict_to_cli_options(top_opts_dict))
top_level = top_opts + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.TYPE,
"--monitor=%s" % self.monitor.TYPE,
"--no-pylint",
"--with=airflow_internal",
]
if node.name == "start":
# We need a separate unique ID for the special _parameters task
task_id_params = "%s-params" % AIRFLOW_MACROS.create_task_id(
self.contains_foreach
)
# Export user-defined parameters into runtime environment
param_file = "".join(
random.choice(string.ascii_lowercase) for _ in range(10)
)
# Setup Parameters as environment variables which are stored in a dictionary.
export_params = (
"python -m "
"metaflow.plugins.airflow.plumbing.set_parameters %s "
"&& . `pwd`/%s" % (param_file, param_file)
)
# Setting parameters over here.
params = (
entrypoint
+ top_level
+ [
"init",
"--run-id %s" % AIRFLOW_MACROS.RUN_ID,
"--task-id %s" % task_id_params,
]
)
# Assign tags to run objects.
if self.tags:
params.extend("--tag %s" % tag for tag in self.tags)
# If the start step gets retried, we must be careful not to
# regenerate multiple parameters tasks. Hence, we check first if
# _parameters exists already.
exists = entrypoint + [
# Dump the parameters task
"dump",
"--max-value-size=0",
"%s/_parameters/%s" % (AIRFLOW_MACROS.RUN_ID, task_id_params),
]
cmd = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
" ".join(exists),
export_params,
" ".join(params),
)
cmds.append(cmd)
# set input paths for parameters
paths = "%s/_parameters/%s" % (AIRFLOW_MACROS.RUN_ID, task_id_params)
step = [
"step",
node.name,
"--run-id %s" % AIRFLOW_MACROS.RUN_ID,
"--task-id %s" % AIRFLOW_MACROS.create_task_id(self.contains_foreach),
"--retry-count %s" % AIRFLOW_MACROS.ATTEMPT,
"--max-user-code-retries %d" % user_code_retries,
"--input-paths %s" % paths,
]
if self.tags:
step.extend("--tag %s" % tag for tag in self.tags)
if self.namespace is not None:
step.append("--namespace=%s" % self.namespace)
parent_is_foreach = any( # The immediate parent is a foreach node.
self.graph[n].type == "foreach" for n in node.in_funcs
)
if parent_is_foreach:
step.append("--split-index %s" % AIRFLOW_MACROS.FOREACH_SPLIT_INDEX)
cmds.append(" ".join(entrypoint + top_level + step))
return cmds