def load_tables()

in common/py_libs/bq_helper.py [0:0]


def load_tables(bq_client: bigquery.Client,
                sources: typing.Union[str, typing.List[str]],
                target_tables: typing.Union[str, typing.List[str]],
                location: str,
                continue_if_failed: bool = False,
                skip_existing_tables: bool = False,
                write_disposition: str = bigquery.WriteDisposition.WRITE_EMPTY,
                parallel_jobs: int = 5):
    """Loads data to multiple BigQuery tables.

    Args:
        bq_client (bigquery.Client): BigQuery client to use.
        sources (str | list[str]): data source URI or name.
                               Supported sources:
                                - BigQuery table name as project.dataset.table
                                - Any URI supported by load_table_from_uri
                                  for avro, csv, json and parquet files.
        target_tables (str | list[str]): full target tables names as
                                      "project.dataset.table".
        location (str): BigQuery location.
        continue_if_failed (bool): continue loading tables if some jobs fail.
        skip_existing_tables (bool): Skip tables that already exist.
                                    Defaults to False.
        write_disposition (bigquery.WriteDisposition): write disposition,
                          Defaults to WRITE_EMPTY (skip if has data).
        parallel_jobs (int): maximum number of parallel jobs. Defaults to 5.

     Raises:
        ValueError: If the number of source URIs is not equal to the number
                    of target tables, or if an unsupported source format
                    is provided.
    """

    if not isinstance(sources, abc.Sequence):
        sources = [sources]
    if not isinstance(target_tables, abc.Sequence):
        target_tables = [target_tables]
    if len(target_tables) != len(sources):
        raise ValueError(("Number of source URIs must be equal to "
                          "number of target tables."))

    jobs = []
    for index, source in enumerate(sources):
        target = target_tables[index]
        logging.info("Loading table %s from %s.", target, source)

        if skip_existing_tables and table_exists(bq_client, target):
            logging.warning("⚠️ Table %s already exists. Skipping it.", target)
            continue

        if "://" in source:
            ext = source.split(".")[-1].lower()
            if ext == "avro":
                source_format = bigquery.SourceFormat.AVRO
            elif ext == "parquet":
                source_format = bigquery.SourceFormat.PARQUET
            elif ext == "csv":
                source_format = bigquery.SourceFormat.CSV
            elif ext == "json":
                source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
            else:
                raise ValueError((f"Extension `{ext}` "
                                 "is an unsupported source format."))
            job_config = bigquery.LoadJobConfig(
                autodetect=True,
                source_format=source_format,
                write_disposition=write_disposition,
            )
            load_job = bq_client.load_table_from_uri(
                source_uris=source,
                destination=target,
                job_config=job_config,
                location=location,
                retry=retry.Retry(deadline=60),
            )
        else:
            job_config = bigquery.CopyJobConfig(
                write_disposition=write_disposition)
            load_job = bq_client.copy_table(source,
                                            target,
                                            location=location,
                                            job_config=job_config,
                                            retry=retry.Retry(deadline=60))
        jobs.append(load_job)
        # If reached parallel_jobs number, wait for them to finish.
        if len(jobs) >= parallel_jobs:
            _wait_for_bq_jobs(jobs, continue_if_failed)
            jobs.clear()

    # Wait for the rest of jobs to finish.
    _wait_for_bq_jobs(jobs, continue_if_failed)