def export_partitioned_table()

in projects/hive-bigquery-connector-demo/scripts/bq_funcs.py [0:0]


def export_partitioned_table(table_name: str, bucket_name: str):
    """Export a partitioned table to GCS."""
    table = ScriptState.bq_client().get_table(table_name)
    short_table_name = table.table_id
    partition_field = table.time_partitioning.field.replace("_at", "_date")
    partitions = ScriptState.bq_client().list_partitions(table)
    all_jobs = []
    bucket = ScriptState.gcs_client().get_bucket(bucket_name)
    logger.info(
        "Cleaning path gs://%s/data/%s and " "cleaning path gs://%s/backup/%s",
        bucket_name,
        short_table_name,
        bucket_name,
        short_table_name,
    )

    delete_queue = multiprocessing.Queue()

    blobs = list(bucket.list_blobs(prefix=f"data/{short_table_name}")) + list(
        bucket.list_blobs(prefix=f"backup/{short_table_name}")
    )
    logger.info("Found %s blobs to delete", len(blobs))
    if blobs:
        for b in blobs:
            delete_queue.put(b.name)
        workers = []
        for i in range(ScriptState.processes()):
            p = multiprocessing.Process(
                target=delete_partition_worker,
                args=(delete_queue, f"worker-{i}", bucket_name),
            )
            workers.append(p)
            p.start()
        for p in workers:
            p.join()

    # No need for concurrency here - this is mostly handled by bq
    for partition in partitions:
        target_partition = f"{partition[:4]}-{partition[4:6]}-{partition[6:]}"

        dest_path = [
            f"gs://{bucket_name}/data/{short_table_name}/{partition_field}="
            f"{target_partition}/*.parquet",
            f"gs://{bucket_name}/backup/{short_table_name}/{partition_field}="
            f"{target_partition}/*.parquet",
        ]
        extract_job = ScriptState.bq_client().extract_table(
            f"{table_name}${partition}",
            dest_path,
            job_id_prefix=f"extract-table-{short_table_name}-" f"{partition}",
            location="US",
            job_config=bigquery.ExtractJobConfig(
                destination_format="PARQUET",
            ),
        )
        extract_job.add_done_callback(
            logging_callback(partition, short_table_name)
        )
        all_jobs.append(extract_job)
    logger.info("Waiting for all jobs to finish")
    while not all(map(lambda job: job.done(), all_jobs)):
        time.sleep(1)
    error_jobs = list(
        filter(lambda job: job.error_result is not None, all_jobs)
    )
    if any(error_jobs):
        logger.info("One or more jobs have failed. Here's one of them:")
        logger.info(error_jobs[0].error_result)
        raise RuntimeError(error_jobs[0].job_result["message"])
    logger.info(
        "Finished extraction of table %s to partition destinations", table_name
    )
    return f"gs://{bucket_name}/data/{short_table_name}/"