in jobs/play-store-export/play_store_export/export.py [0:0]
def wait_for_transfer(transfer_name: str, timeout: int = 1200, polling_period: int = 20) -> int:
"""
Continuously poll for run status to wait for completion
"""
client = bigquery_datatransfer.DataTransferServiceClient()
state = transfer_enums.TransferState.PENDING
time_elapsed = 0
while (state == transfer_enums.TransferState.PENDING or
state == transfer_enums.TransferState.RUNNING):
try:
transfer_run = client.get_transfer_run(transfer_name)
except GoogleAPICallError as e:
# grpc errors are not serializable and cannot be raised in multiprocessing
raise DataTransferException(f"Error getting transfer run: {e.message}")
run_date = datetime.datetime.utcfromtimestamp(transfer_run.run_time.seconds).date()
state = transfer_run.state
if not (state == transfer_enums.TransferState.PENDING or
state == transfer_enums.TransferState.RUNNING):
break
if time_elapsed >= timeout:
logging.info(f"Transfer for {run_date} did not complete in {timeout} seconds")
return -1
time.sleep(polling_period)
time_elapsed += polling_period
if state == transfer_enums.TransferState.SUCCEEDED:
result = "succeeded"
elif state == transfer_enums.TransferState.CANCELLED:
result = "cancelled"
elif state == transfer_enums.TransferState.FAILED:
result = "failed"
else:
result = "unspecified"
logging.info(f"Transfer for {run_date} {result}")
return state