in jobs/play-store-export/play_store_export/export.py [0:0]
def start_export(project: str, transfer_config_name: str, transfer_location: str,
base_date: datetime.date, backfill_day_count: int):
"""
Start and wait for the completion of a backfill of `backfill_day_count` days, counting
backwards from `base_date. The base date is included in the backfill and counts as a
day in the day count, i.e. `backfill_day_count` will backfill only .
"""
if backfill_day_count <= 0:
raise ValueError("Number of days to backfill must be at least 1")
client = bigquery_datatransfer.DataTransferServiceClient()
play_store_transfer_config = client.location_transfer_config_path(
project, transfer_location, transfer_config_name
)
oldest_date = base_date - datetime.timedelta(days=backfill_day_count - 1)
end_date = base_date
logging.info(f"Backfilling {backfill_day_count} days: {oldest_date} to {base_date}")
transfer_results = []
# break backfills into BACKFILL_DAYS_MAX day segments
while True:
start_date = max(end_date - datetime.timedelta(days=BACKFILL_DAYS_MAX - 1), oldest_date)
transfer_runs = trigger_backfill(start_date, end_date,
play_store_transfer_config, client)
transfer_run_names = [transfer_run.name for transfer_run
in sorted(transfer_runs, key=lambda run: run.schedule_time.seconds)]
end_date = start_date - datetime.timedelta(days=1)
# wait for backfill to complete
# days in backfill are scheduled by the transfer service sequentially with 30s in between
# starting from the latest date but can run in parallel
new_results = map(wait_for_transfer, transfer_run_names)
transfer_results.extend(new_results)
if start_date == oldest_date:
break
elif start_date < oldest_date:
raise ValueError("start_date should not be greater than oldest_date")
successes = len([
result for result in transfer_results
if result == transfer_enums.TransferState.SUCCEEDED
])
if len(transfer_results) != successes:
raise DataTransferException(f"{len(transfer_results) - successes} failed dates")