def wait_for_completion()

in pai/pipeline/run.py [0:0]


    def wait_for_completion(self, show_outputs=True):
        """Wait until the pipeline run stop."""
        run_info = self.get_run_info()
        node_id = run_info["NodeId"]
        if not node_id:
            raise ValueError("Expect NodeId in GetRun response")

        run_status = run_info["Status"]
        if run_status == PipelineRunStatus.Initialized:
            raise ValueError(
                'Pipeline run instance is in status "Init", please start the run instance.'
            )
        elif run_status in (PipelineRunStatus.Terminated, PipelineRunStatus.Suspended):
            raise ValueError(
                "Pipeline run instance is stopped(status:%s), please resume/retry the run."
                % run_status
            )
        elif run_status == PipelineRunStatus.Failed:
            raise ValueError("Pipeline run is failed.")
        elif run_status in (PipelineRunStatus.Skipped, PipelineRunStatus.Unknown):
            raise ValueError(
                "Pipeline run in unexpected status(%s:%s)" % (self.run_id, run_status)
            )
        elif run_status == PipelineRunStatus.Succeeded:
            return

        # Wait for Workflow init.
        print("Wait for run workflow init")

        if show_outputs:
            run_logger = _RunLogger(
                run_instance=self, node_id=node_id, session=self.session
            )
        else:
            run_logger = _MockRunLogger(run_instance=self, node_id=node_id)

        try:
            prev_status_infos = {}
            root_node_status = run_status
            log_runners = []
            while PipelineRunStatus.is_running(root_node_status):
                curr_status_infos = self.travel_node_status_info(node_id)
                for node_fullname, status_info in curr_status_infos.items():
                    if (
                        node_fullname not in prev_status_infos
                        and status_info["status"] != PipelineRunStatus.Skipped
                    ):
                        log_runner = run_logger.submit(
                            node_id=status_info["nodeId"], node_name=node_fullname
                        )
                        if log_runner:
                            log_runners.append(log_runner)
                prev_status_infos = curr_status_infos
                root_node_status = (
                    curr_status_infos[self.name]["status"]
                    if self.name in curr_status_infos
                    else root_node_status
                )

                if root_node_status == PipelineRunStatus.Failed:
                    raise PAIException(
                        "PipelineRun failed: run_id={}, run_status_info={}".format(
                            self.run_id, curr_status_infos
                        )
                    )
                failed_nodes = {
                    name: status_info
                    for name, status_info in curr_status_infos.items()
                    if PipelineRunStatus.Failed == status_info["status"]
                }
                if failed_nodes:
                    raise PAIException(
                        "PipelineRun failed: run_id={}, failed_nodes={}".format(
                            self.run_id, failed_nodes
                        )
                    )

                time.sleep(2)
        except (KeyboardInterrupt, PAIException) as e:
            run_logger.stop_tail()
            raise e

        for log_runner in log_runners:
            _ = log_runner.result()

        return self