def external_query()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]


def external_query(  # pylint: disable=too-many-arguments
        gcs_client: storage.Client, bq_client: bigquery.Client, gsurl: str,
        query: str, job_id: str, table: bigquery.TableReference):
    """Load from query over external table from GCS.

    This hinges on a SQL query defined in GCS at _config/*.sql and
    an external table definition
    _config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME} (otherwise will assume
    PARQUET external table)
    """
    external_table_config = read_gcs_file_if_exists(
        gcs_client,
        f"{gsurl}_config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME}")
    if not external_table_config:
        external_table_config = look_for_config_in_parents(
            gcs_client, gsurl, constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME)
    if external_table_config:
        external_table_def = json.loads(external_table_config)
    else:
        print(f" {gsurl}_config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME} "
              f"not found in parents of {gsurl}. "
              "Falling back to default PARQUET external table: "
              f"{json.dumps(constants.DEFAULT_EXTERNAL_TABLE_DEFINITION)}")
        external_table_def = constants.DEFAULT_EXTERNAL_TABLE_DEFINITION
    print(
        json.dumps(
            dict(message="Found external table definition.",
                 table=table.to_api_repr(),
                 external_table_def=external_table_def)))
    # Reduce the amount of sourceUris by using wildcards with common
    # prefixes. This is done to keep the cloud logging audit metadata
    # below 100k in size, otherwise the metadata is omitted in the event.
    source_uris_with_wildcards = compact_source_uris_with_wildcards(
        flatten2dlist(get_batches_for_gsurl(gcs_client, gsurl)))
    # This may cause an issue if >10,000 files.
    external_table_def["sourceUris"] = source_uris_with_wildcards
    # Check if hivePartitioningOptions
    # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions
    # is set in external.json file
    if external_table_def.get("hivePartitioningOptions"):
        external_table_def["hivePartitioningOptions"] = {
            "mode":
                external_table_def["hivePartitioningOptions"].get("mode")
                or "AUTO",
            "sourceUriPrefix":
                get_hive_partitioning_source_uri_prefix(
                    external_table_def["sourceUris"][0])
        }
    external_config = bigquery.ExternalConfig.from_api_repr(external_table_def)
    job_config = bigquery.QueryJobConfig(
        table_definitions={"temp_ext": external_config}, use_legacy_sql=False)

    # drop partition decorator if present.
    table_id = table.table_id.split("$")[0]
    # similar syntax to str.format but doesn't require escaping braces
    # elsewhere in query (e.g. in a regex)
    rendered_query = query.replace(
        "{dest_dataset}", f"`{table.project}`.{table.dataset_id}").replace(
            "{dest_table}", table_id)
    job: bigquery.QueryJob = bq_client.query(rendered_query,
                                             job_config=job_config,
                                             job_id=job_id)
    logging.log_bigquery_job(job, table,
                             f"Submitted asynchronous query job: {job.job_id}")
    start_poll_for_errors = time.monotonic()
    # Check if job failed quickly
    while time.monotonic(
    ) - start_poll_for_errors < constants.WAIT_FOR_JOB_SECONDS:
        job.reload(client=bq_client)
        if job.state == "DONE":
            check_for_bq_job_and_children_errors(bq_client, job, table)
            return
        time.sleep(constants.JOB_POLL_INTERVAL_SECONDS)