in tfx_airflow/notebooks/utils.py [0:0]
def _add_parents(self, g, node_id, is_artifact, depth, max_depth=None):
"""Adds the parent artifacts/executions for `node_id` to the graph `g`."""
# if it is not an artifact, use negative gnode id
gnode_id = node_id if is_artifact else -1 * node_id
self._add_node_attribute(g, node_id, depth, is_artifact)
if gnode_id in g and g.in_edges(gnode_id):
return
if max_depth is not None and depth > max_depth:
return
if is_artifact:
for (e_id, is_cached) in self._get_upstream_execution_ids(node_id):
g.add_edge(e_id * -1, node_id, is_cached=is_cached)
self._add_parents(g, e_id, not is_artifact, depth + 1, max_depth)
else:
for a_id in self._get_upstream_artifact_ids(node_id):
g.add_edge(a_id, node_id * -1, is_cached=False)
self._add_parents(g, a_id, not is_artifact, depth + 1, max_depth)