in metaflow/plugins/aws/step_functions/step_functions.py [0:0]
def _compile(self):
if self.flow._flow_decorators.get("trigger") or self.flow._flow_decorators.get(
"trigger_on_finish"
):
raise StepFunctionsException(
"Deploying flows with @trigger or @trigger_on_finish decorator(s) "
"to AWS Step Functions is not supported currently."
)
# Visit every node of the flow and recursively build the state machine.
def _visit(node, workflow, exit_node=None):
if node.parallel_foreach:
raise StepFunctionsException(
"Deploying flows with @parallel decorator(s) "
"to AWS Step Functions is not supported currently."
)
# Assign an AWS Batch job to the AWS Step Functions state
# and pass the intermediate state by exposing `JobId` and
# `Parameters` to the child job(s) as outputs. `Index` and
# `SplitParentTaskId` are populated optionally, when available.
# We can't modify the names of keys in AWS Step Functions aside
# from a blessed few which are set as `Parameters` for the Map
# state. That's why even though `JobId` refers to the parent task
# id, we can't call it as such. Similar situation for `Parameters`.
state = (
State(node.name)
.batch(self._batch(node))
.output_path(
"$.['JobId', " "'Parameters', " "'Index', " "'SplitParentTaskId']"
)
)
# End the (sub)workflow if we have reached the end of the flow or
# the parent step of matching_join of the sub workflow.
if node.type == "end" or exit_node in node.out_funcs:
workflow.add_state(state.end())
# Continue linear assignment within the (sub)workflow if the node
# doesn't branch or fork.
elif node.type in ("start", "linear", "join"):
workflow.add_state(state.next(node.out_funcs[0]))
_visit(self.graph[node.out_funcs[0]], workflow, exit_node)
# Create a `Parallel` state and assign sub workflows if the node
# branches out.
elif node.type == "split":
branch_name = hashlib.sha224(
"&".join(node.out_funcs).encode("utf-8")
).hexdigest()
workflow.add_state(state.next(branch_name))
branch = Parallel(branch_name).next(node.matching_join)
# Generate as many sub workflows as branches and recurse.
for n in node.out_funcs:
branch.branch(
_visit(
self.graph[n], Workflow(n).start_at(n), node.matching_join
)
)
workflow.add_state(branch)
# Continue the traversal from the matching_join.
_visit(self.graph[node.matching_join], workflow, exit_node)
# Create a `Map` state and assign sub workflow if the node forks.
elif node.type == "foreach":
# Fetch runtime cardinality via an AWS DynamoDb Get call before
# configuring the node
cardinality_state_name = "#%s" % node.out_funcs[0]
workflow.add_state(state.next(cardinality_state_name))
cardinality_state = (
State(cardinality_state_name)
.dynamo_db(SFN_DYNAMO_DB_TABLE, "$.JobId", "for_each_cardinality")
.result_path("$.Result")
)
iterator_name = "*%s" % node.out_funcs[0]
workflow.add_state(cardinality_state.next(iterator_name))
workflow.add_state(
Map(iterator_name)
.items_path("$.Result.Item.for_each_cardinality.NS")
.parameter("JobId.$", "$.JobId")
.parameter("SplitParentTaskId.$", "$.JobId")
.parameter("Parameters.$", "$.Parameters")
.parameter("Index.$", "$$.Map.Item.Value")
.next(
"%s_*GetManifest" % iterator_name
if self.use_distributed_map
else node.matching_join
)
.iterator(
_visit(
self.graph[node.out_funcs[0]],
Workflow(node.out_funcs[0])
.start_at(node.out_funcs[0])
.mode(
"DISTRIBUTED" if self.use_distributed_map else "INLINE"
),
node.matching_join,
)
)
.max_concurrency(self.max_workers)
# AWS Step Functions has a short coming for DistributedMap at the
# moment that does not allow us to subset the output of for-each
# to just a single element. We have to rely on a rather terrible
# hack and resort to using ResultWriter to write the state to
# Amazon S3 and process it in another task. But, well what can we
# do...
.result_writer(
*(
(
(
SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH[len("s3://") :]
if SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH.startswith(
"s3://"
)
else SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH
).split("/", 1)
+ [""]
)[:2]
if self.use_distributed_map
else (None, None)
)
)
.output_path("$" if self.use_distributed_map else "$.[0]")
)
if self.use_distributed_map:
workflow.add_state(
State("%s_*GetManifest" % iterator_name)
.resource("arn:aws:states:::aws-sdk:s3:getObject")
.parameter("Bucket.$", "$.ResultWriterDetails.Bucket")
.parameter("Key.$", "$.ResultWriterDetails.Key")
.next("%s_*Map" % iterator_name)
.result_selector("Body.$", "States.StringToJson($.Body)")
)
workflow.add_state(
Map("%s_*Map" % iterator_name)
.iterator(
Workflow("%s_*PassWorkflow" % iterator_name)
.mode("DISTRIBUTED")
.start_at("%s_*Pass" % iterator_name)
.add_state(
Pass("%s_*Pass" % iterator_name)
.end()
.parameter("Output.$", "States.StringToJson($.Output)")
.output_path("$.Output")
)
)
.next(node.matching_join)
.max_concurrency(1000)
.item_reader(
JSONItemReader()
.resource("arn:aws:states:::s3:getObject")
.parameter("Bucket.$", "$.Body.DestinationBucket")
.parameter("Key.$", "$.Body.ResultFiles.SUCCEEDED[0].Key")
)
.output_path("$.[0]")
)
# Continue the traversal from the matching_join.
_visit(self.graph[node.matching_join], workflow, exit_node)
# We shouldn't ideally ever get here.
else:
raise StepFunctionsException(
"Node type *%s* for step *%s* "
"is not currently supported by "
"AWS Step Functions." % (node.type, node.name)
)
return workflow
workflow = Workflow(self.name).start_at("start")
if self.workflow_timeout:
workflow.timeout_seconds(self.workflow_timeout)
return _visit(self.graph["start"], workflow)