in pai/pipeline/run.py [0:0]
def travel_node_status_info(self, node_id, max_depth=10):
node_status_info = dict()
def pipelines_travel(curr_node_id, parent=None, cur_depth=1):
if cur_depth > max_depth:
return
run_node_detail_info = self.session.pipeline_run_api.get_node(
self.run_id,
curr_node_id,
depth=2,
)
if (
not run_node_detail_info
or "StartedAt" not in run_node_detail_info["StatusInfo"]
):
return
if parent is None:
curr_root_name = self.name
else:
curr_root_name = "{0}.{1}".format(
run_node_detail_info["Metadata"]["Name"], parent
)
node_status_info[curr_root_name] = self._pipeline_node_info(
run_node_detail_info
)
pipelines = run_node_detail_info["Spec"].get("Pipelines", [])
if not pipelines:
return
for sub_pipeline in pipelines:
node_name = "{0}.{1}".format(
curr_root_name, sub_pipeline["Metadata"]["Name"]
)
node_status_info[node_name] = self._pipeline_node_info(sub_pipeline)
next_node_id = sub_pipeline["Metadata"]["NodeId"]
if sub_pipeline["Metadata"]["NodeType"] == "Dag" and next_node_id:
pipelines_travel(next_node_id, curr_root_name, cur_depth + 1)
pipelines_travel(curr_node_id=node_id)
return node_status_info