in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def external_query( # pylint: disable=too-many-arguments
gcs_client: storage.Client, bq_client: bigquery.Client, gsurl: str,
query: str, job_id: str, table: bigquery.TableReference):
"""Load from query over external table from GCS.
This hinges on a SQL query defined in GCS at _config/*.sql and
an external table definition
_config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME} (otherwise will assume
PARQUET external table)
"""
external_table_config = read_gcs_file_if_exists(
gcs_client,
f"{gsurl}_config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME}")
if not external_table_config:
external_table_config = look_for_config_in_parents(
gcs_client, gsurl, constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME)
if external_table_config:
external_table_def = json.loads(external_table_config)
else:
print(f" {gsurl}_config/{constants.BQ_EXTERNAL_TABLE_CONFIG_FILENAME} "
f"not found in parents of {gsurl}. "
"Falling back to default PARQUET external table: "
f"{json.dumps(constants.DEFAULT_EXTERNAL_TABLE_DEFINITION)}")
external_table_def = constants.DEFAULT_EXTERNAL_TABLE_DEFINITION
print(
json.dumps(
dict(message="Found external table definition.",
table=table.to_api_repr(),
external_table_def=external_table_def)))
# Reduce the amount of sourceUris by using wildcards with common
# prefixes. This is done to keep the cloud logging audit metadata
# below 100k in size, otherwise the metadata is omitted in the event.
source_uris_with_wildcards = compact_source_uris_with_wildcards(
flatten2dlist(get_batches_for_gsurl(gcs_client, gsurl)))
# This may cause an issue if >10,000 files.
external_table_def["sourceUris"] = source_uris_with_wildcards
# Check if hivePartitioningOptions
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions
# is set in external.json file
if external_table_def.get("hivePartitioningOptions"):
external_table_def["hivePartitioningOptions"] = {
"mode":
external_table_def["hivePartitioningOptions"].get("mode")
or "AUTO",
"sourceUriPrefix":
get_hive_partitioning_source_uri_prefix(
external_table_def["sourceUris"][0])
}
external_config = bigquery.ExternalConfig.from_api_repr(external_table_def)
job_config = bigquery.QueryJobConfig(
table_definitions={"temp_ext": external_config}, use_legacy_sql=False)
# drop partition decorator if present.
table_id = table.table_id.split("$")[0]
# similar syntax to str.format but doesn't require escaping braces
# elsewhere in query (e.g. in a regex)
rendered_query = query.replace(
"{dest_dataset}", f"`{table.project}`.{table.dataset_id}").replace(
"{dest_table}", table_id)
job: bigquery.QueryJob = bq_client.query(rendered_query,
job_config=job_config,
job_id=job_id)
logging.log_bigquery_job(job, table,
f"Submitted asynchronous query job: {job.job_id}")
start_poll_for_errors = time.monotonic()
# Check if job failed quickly
while time.monotonic(
) - start_poll_for_errors < constants.WAIT_FOR_JOB_SECONDS:
job.reload(client=bq_client)
if job.state == "DONE":
check_for_bq_job_and_children_errors(bq_client, job, table)
return
time.sleep(constants.JOB_POLL_INTERVAL_SECONDS)