in fbpcs/pl_coordinator/pl_instance_runner.py [0:0]
def wait_stage_complete(self, stage: PrivateComputationBaseStageFlow) -> None:
start_status = stage.started_status
complete_status = stage.completed_status
fail_status = stage.failed_status
timeout = stage.timeout
start_time = time()
cancel_time = 0
while time() < start_time + timeout:
self.publisher.update_instance()
self.partner.update_instance()
self.logger.info(
f"Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
)
if (
self.publisher.status is complete_status
and self.partner.status is complete_status
):
self.logger.info(f"Stage {stage.name} is complete.")
return
if (
self.publisher.status
in [fail_status, PrivateComputationInstanceStatus.TIMEOUT]
or self.partner.status is fail_status
):
if (
self.publisher.status
in [fail_status, PrivateComputationInstanceStatus.TIMEOUT]
and self.partner.status is start_status
and cancel_time <= CANCEL_STAGE_TIMEOUT
):
# wait 5 minutes for partner to become fail status on its own
# if not, only perform 'cancel_stage' one time
if cancel_time == CANCEL_STAGE_TIMEOUT:
self.logger.error(f"Canceling partner stage {stage.name}.")
self.partner.cancel_current_stage()
else:
self.logger.info(
f"Waiting to cancel partner stage {stage.name}."
)
# only cancel once
cancel_time += POLL_INTERVAL
else:
raise PLInstanceCalculationException(
f"Stage {stage.name} failed. Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
)
sleep(POLL_INTERVAL)
raise PLInstanceCalculationException(
f"Stage {stage.name} timed out after {timeout}s. Publisher status: {self.publisher.status}. Partner status: {self.partner.status}."
)