in pai/pipeline/run.py [0:0]
def wait_for_completion(self, show_outputs=True):
"""Wait until the pipeline run stop."""
run_info = self.get_run_info()
node_id = run_info["NodeId"]
if not node_id:
raise ValueError("Expect NodeId in GetRun response")
run_status = run_info["Status"]
if run_status == PipelineRunStatus.Initialized:
raise ValueError(
'Pipeline run instance is in status "Init", please start the run instance.'
)
elif run_status in (PipelineRunStatus.Terminated, PipelineRunStatus.Suspended):
raise ValueError(
"Pipeline run instance is stopped(status:%s), please resume/retry the run."
% run_status
)
elif run_status == PipelineRunStatus.Failed:
raise ValueError("Pipeline run is failed.")
elif run_status in (PipelineRunStatus.Skipped, PipelineRunStatus.Unknown):
raise ValueError(
"Pipeline run in unexpected status(%s:%s)" % (self.run_id, run_status)
)
elif run_status == PipelineRunStatus.Succeeded:
return
# Wait for Workflow init.
print("Wait for run workflow init")
if show_outputs:
run_logger = _RunLogger(
run_instance=self, node_id=node_id, session=self.session
)
else:
run_logger = _MockRunLogger(run_instance=self, node_id=node_id)
try:
prev_status_infos = {}
root_node_status = run_status
log_runners = []
while PipelineRunStatus.is_running(root_node_status):
curr_status_infos = self.travel_node_status_info(node_id)
for node_fullname, status_info in curr_status_infos.items():
if (
node_fullname not in prev_status_infos
and status_info["status"] != PipelineRunStatus.Skipped
):
log_runner = run_logger.submit(
node_id=status_info["nodeId"], node_name=node_fullname
)
if log_runner:
log_runners.append(log_runner)
prev_status_infos = curr_status_infos
root_node_status = (
curr_status_infos[self.name]["status"]
if self.name in curr_status_infos
else root_node_status
)
if root_node_status == PipelineRunStatus.Failed:
raise PAIException(
"PipelineRun failed: run_id={}, run_status_info={}".format(
self.run_id, curr_status_infos
)
)
failed_nodes = {
name: status_info
for name, status_info in curr_status_infos.items()
if PipelineRunStatus.Failed == status_info["status"]
}
if failed_nodes:
raise PAIException(
"PipelineRun failed: run_id={}, failed_nodes={}".format(
self.run_id, failed_nodes
)
)
time.sleep(2)
except (KeyboardInterrupt, PAIException) as e:
run_logger.stop_tail()
raise e
for log_runner in log_runners:
_ = log_runner.result()
return self