in projects/hive-bigquery-connector-demo/scripts/bq_funcs.py [0:0]
def export_partitioned_table(table_name: str, bucket_name: str):
"""Export a partitioned table to GCS."""
table = ScriptState.bq_client().get_table(table_name)
short_table_name = table.table_id
partition_field = table.time_partitioning.field.replace("_at", "_date")
partitions = ScriptState.bq_client().list_partitions(table)
all_jobs = []
bucket = ScriptState.gcs_client().get_bucket(bucket_name)
logger.info(
"Cleaning path gs://%s/data/%s and " "cleaning path gs://%s/backup/%s",
bucket_name,
short_table_name,
bucket_name,
short_table_name,
)
delete_queue = multiprocessing.Queue()
blobs = list(bucket.list_blobs(prefix=f"data/{short_table_name}")) + list(
bucket.list_blobs(prefix=f"backup/{short_table_name}")
)
logger.info("Found %s blobs to delete", len(blobs))
if blobs:
for b in blobs:
delete_queue.put(b.name)
workers = []
for i in range(ScriptState.processes()):
p = multiprocessing.Process(
target=delete_partition_worker,
args=(delete_queue, f"worker-{i}", bucket_name),
)
workers.append(p)
p.start()
for p in workers:
p.join()
# No need for concurrency here - this is mostly handled by bq
for partition in partitions:
target_partition = f"{partition[:4]}-{partition[4:6]}-{partition[6:]}"
dest_path = [
f"gs://{bucket_name}/data/{short_table_name}/{partition_field}="
f"{target_partition}/*.parquet",
f"gs://{bucket_name}/backup/{short_table_name}/{partition_field}="
f"{target_partition}/*.parquet",
]
extract_job = ScriptState.bq_client().extract_table(
f"{table_name}${partition}",
dest_path,
job_id_prefix=f"extract-table-{short_table_name}-" f"{partition}",
location="US",
job_config=bigquery.ExtractJobConfig(
destination_format="PARQUET",
),
)
extract_job.add_done_callback(
logging_callback(partition, short_table_name)
)
all_jobs.append(extract_job)
logger.info("Waiting for all jobs to finish")
while not all(map(lambda job: job.done(), all_jobs)):
time.sleep(1)
error_jobs = list(
filter(lambda job: job.error_result is not None, all_jobs)
)
if any(error_jobs):
logger.info("One or more jobs have failed. Here's one of them:")
logger.info(error_jobs[0].error_result)
raise RuntimeError(error_jobs[0].job_result["message"])
logger.info(
"Finished extraction of table %s to partition destinations", table_name
)
return f"gs://{bucket_name}/data/{short_table_name}/"