in common/py_libs/bq_helper.py [0:0]
def load_tables(bq_client: bigquery.Client,
sources: typing.Union[str, typing.List[str]],
target_tables: typing.Union[str, typing.List[str]],
location: str,
continue_if_failed: bool = False,
skip_existing_tables: bool = False,
write_disposition: str = bigquery.WriteDisposition.WRITE_EMPTY,
parallel_jobs: int = 5):
"""Loads data to multiple BigQuery tables.
Args:
bq_client (bigquery.Client): BigQuery client to use.
sources (str | list[str]): data source URI or name.
Supported sources:
- BigQuery table name as project.dataset.table
- Any URI supported by load_table_from_uri
for avro, csv, json and parquet files.
target_tables (str | list[str]): full target tables names as
"project.dataset.table".
location (str): BigQuery location.
continue_if_failed (bool): continue loading tables if some jobs fail.
skip_existing_tables (bool): Skip tables that already exist.
Defaults to False.
write_disposition (bigquery.WriteDisposition): write disposition,
Defaults to WRITE_EMPTY (skip if has data).
parallel_jobs (int): maximum number of parallel jobs. Defaults to 5.
Raises:
ValueError: If the number of source URIs is not equal to the number
of target tables, or if an unsupported source format
is provided.
"""
if not isinstance(sources, abc.Sequence):
sources = [sources]
if not isinstance(target_tables, abc.Sequence):
target_tables = [target_tables]
if len(target_tables) != len(sources):
raise ValueError(("Number of source URIs must be equal to "
"number of target tables."))
jobs = []
for index, source in enumerate(sources):
target = target_tables[index]
logging.info("Loading table %s from %s.", target, source)
if skip_existing_tables and table_exists(bq_client, target):
logging.warning("⚠️ Table %s already exists. Skipping it.", target)
continue
if "://" in source:
ext = source.split(".")[-1].lower()
if ext == "avro":
source_format = bigquery.SourceFormat.AVRO
elif ext == "parquet":
source_format = bigquery.SourceFormat.PARQUET
elif ext == "csv":
source_format = bigquery.SourceFormat.CSV
elif ext == "json":
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
else:
raise ValueError((f"Extension `{ext}` "
"is an unsupported source format."))
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=source_format,
write_disposition=write_disposition,
)
load_job = bq_client.load_table_from_uri(
source_uris=source,
destination=target,
job_config=job_config,
location=location,
retry=retry.Retry(deadline=60),
)
else:
job_config = bigquery.CopyJobConfig(
write_disposition=write_disposition)
load_job = bq_client.copy_table(source,
target,
location=location,
job_config=job_config,
retry=retry.Retry(deadline=60))
jobs.append(load_job)
# If reached parallel_jobs number, wait for them to finish.
if len(jobs) >= parallel_jobs:
_wait_for_bq_jobs(jobs, continue_if_failed)
jobs.clear()
# Wait for the rest of jobs to finish.
_wait_for_bq_jobs(jobs, continue_if_failed)