in tfx/orchestration/portable/input_resolution/input_graph_resolver.py [0:0]
def _topologically_sorted_node_ids(
input_graph: pipeline_pb2.InputGraph) -> Iterable[str]:
"""Get topologically sorted InputGraph.nodes ids."""
parents = collections.defaultdict(list)
children = collections.defaultdict(list)
for node_id, node_def in input_graph.nodes.items():
kind = node_def.WhichOneof('kind')
if kind == 'op_node':
for arg in [*node_def.op_node.args, *node_def.op_node.kwargs.values()]:
if arg.WhichOneof('kind') == 'node_id':
parents[node_id].append(arg.node_id)
elif kind == 'dict_node':
parents[node_id].extend(node_def.dict_node.node_ids.values())
elif kind == 'input_node':
continue # InputNode does not have further dependency within the graph.
else:
raise exceptions.UnimplementedError(
f'InputGraph node {node_id} has unknown kind {kind}.')
for me, parents_of_me in parents.items():
for parent in parents_of_me:
children[parent].append(me)
try:
topsorted_layers = topsort.topsorted_layers(
list(input_graph.nodes.keys()),
get_node_id_fn=lambda x: x,
get_parent_nodes=parents.__getitem__,
get_child_nodes=children.__getitem__)
except topsort.InvalidDAGError as e:
raise exceptions.FailedPreconditionError(
f'InputGraph has a cycle. parents = {parents}.') from e
for layer in topsorted_layers:
for node_id in layer:
yield node_id