in metaflow/plugins/aws/step_functions/step_functions.py [0:0]
def _step_cli(self, node, paths, code_package_url, user_code_retries):
cmds = []
script_name = os.path.basename(sys.argv[0])
executable = self.environment.executable(node.name)
if R.use_r():
entrypoint = [R.entrypoint()]
else:
entrypoint = [executable, script_name]
# Use AWS Batch job identifier as the globally unique task identifier.
task_id = "${AWS_BATCH_JOB_ID}"
top_opts_dict = {
"with": [
decorator.make_decorator_spec()
for decorator in node.decorators
if not decorator.statically_defined
]
}
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())
top_opts = list(dict_to_cli_options(top_opts_dict))
top_level = top_opts + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.TYPE,
"--monitor=%s" % self.monitor.TYPE,
"--no-pylint",
"--with=step_functions_internal",
]
if node.name == "start":
# We need a separate unique ID for the special _parameters task
task_id_params = "%s-params" % task_id
# Export user-defined parameters into runtime environment
param_file = "".join(
random.choice(string.ascii_lowercase) for _ in range(10)
)
export_params = (
"python -m "
"metaflow.plugins.aws.step_functions.set_batch_environment "
"parameters %s && . `pwd`/%s" % (param_file, param_file)
)
params = (
entrypoint
+ top_level
+ [
"init",
"--run-id sfn-$METAFLOW_RUN_ID",
"--task-id %s" % task_id_params,
]
)
# Assign tags to run objects.
if self.tags:
params.extend("--tag %s" % tag for tag in self.tags)
# If the start step gets retried, we must be careful not to
# regenerate multiple parameters tasks. Hence, we check first if
# _parameters exists already.
exists = entrypoint + [
"dump",
"--max-value-size=0",
"sfn-${METAFLOW_RUN_ID}/_parameters/%s" % (task_id_params),
]
cmd = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
" ".join(exists),
export_params,
" ".join(params),
)
cmds.append(cmd)
paths = "sfn-${METAFLOW_RUN_ID}/_parameters/%s" % (task_id_params)
if node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach":
parent_tasks_file = "".join(
random.choice(string.ascii_lowercase) for _ in range(10)
)
export_parent_tasks = (
"python -m "
"metaflow.plugins.aws.step_functions.set_batch_environment "
"parent_tasks %s && . `pwd`/%s" % (parent_tasks_file, parent_tasks_file)
)
cmds.append(export_parent_tasks)
step = [
"step",
node.name,
"--run-id sfn-$METAFLOW_RUN_ID",
"--task-id %s" % task_id,
# Since retries are handled by AWS Batch, we can rely on
# AWS_BATCH_JOB_ATTEMPT as the job counter.
"--retry-count $((AWS_BATCH_JOB_ATTEMPT-1))",
"--max-user-code-retries %d" % user_code_retries,
"--input-paths %s" % paths,
]
if any(self.graph[n].type == "foreach" for n in node.in_funcs):
# We set the `METAFLOW_SPLIT_INDEX` through JSONPath-foo
# to pass the state from the parent DynamoDb state for for-each.
step.append("--split-index $METAFLOW_SPLIT_INDEX")
if self.tags:
step.extend("--tag %s" % tag for tag in self.tags)
if self.namespace is not None:
step.append("--namespace=%s" % self.namespace)
cmds.append(" ".join(entrypoint + top_level + step))
return " && ".join(cmds)