def wait_stage_complete()

in fbpcs/pl_coordinator/pl_instance_runner.py [0:0]


    def wait_stage_complete(self, stage: PrivateComputationBaseStageFlow) -> None:
        start_status = stage.started_status
        complete_status = stage.completed_status
        fail_status = stage.failed_status
        timeout = stage.timeout

        start_time = time()
        cancel_time = 0
        while time() < start_time + timeout:
            self.publisher.update_instance()
            self.partner.update_instance()
            self.logger.info(
                f"Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
            )
            if (
                self.publisher.status is complete_status
                and self.partner.status is complete_status
            ):
                self.logger.info(f"Stage {stage.name} is complete.")
                return
            if (
                self.publisher.status
                in [fail_status, PrivateComputationInstanceStatus.TIMEOUT]
                or self.partner.status is fail_status
            ):
                if (
                    self.publisher.status
                    in [fail_status, PrivateComputationInstanceStatus.TIMEOUT]
                    and self.partner.status is start_status
                    and cancel_time <= CANCEL_STAGE_TIMEOUT
                ):
                    # wait 5 minutes for partner to become fail status on its own
                    # if not, only perform 'cancel_stage' one time
                    if cancel_time == CANCEL_STAGE_TIMEOUT:
                        self.logger.error(f"Canceling partner stage {stage.name}.")
                        self.partner.cancel_current_stage()
                    else:
                        self.logger.info(
                            f"Waiting to cancel partner stage {stage.name}."
                        )
                    # only cancel once
                    cancel_time += POLL_INTERVAL
                else:
                    raise PLInstanceCalculationException(
                        f"Stage {stage.name} failed. Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
                    )
            sleep(POLL_INTERVAL)
        raise PLInstanceCalculationException(
            f"Stage {stage.name} timed out after {timeout}s. Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
        )