functions/data-processing-engines/bq-saved-query-executor/main.py (88 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import google.auth import functions_framework import grpc import requests import base64 import uuid import re import os from google.cloud import bigquery, dataform_v1beta1, resourcemanager_v3 from google.api_core.exceptions import BadRequest from google.auth.transport.requests import Request # --- Authentication Setup --- credentials, project = google.auth.default() BIGQUERY_PROJECT = os.environ.get('BIGQUERY_PROJECT') @functions_framework.http def main(request): """ Main function, likely triggered by an HTTP request. Extracts parameters, reads a BigQuery saved query (backed by Dataform),executes the file's contents as a BigQuery query, and reports the result status or job ID. Args: request: The incoming HTTP request object. Returns: str: The status of the query execution or the job ID (if asynchronous). """ request_json = request.get_json(silent=True) print("event:" + str(request_json)) try: dataform_location = request_json['workflow_properties']['dataform_location'] dataform_project_id = request_json['workflow_properties']['dataform_project_id'] repository_name = request_json['workflow_properties']['repository_name'] workflow_name = request_json['workflow_name'] job_name = request_json['job_name'] file_path = f"definitions/{workflow_name}/{job_name}.sqlx" job_id = request_json.get('job_id', None) query_variables = request_json.get('query_variables', None) query_file = read_file(dataform_project_id, dataform_location, repository_name, file_path, query_variables) status_or_job_id = execute_query_or_get_status(query_file, file_path, job_id) if status_or_job_id.startswith('aef_'): print(f"Running Query, track it with Job ID: {status_or_job_id}") else: print(f"Query finished with status: {status_or_job_id}") return status_or_job_id except Exception as error: err_message = "Exception: " + repr(error) response = { "error": error.__class__.__name__, "message": repr(error) } return response def read_file(project_id, location, repository_name, file_path, query_variables): """ Reads a file from a Google Dataform repository and optionally replaces variables. Args: project_id (str): The Google Cloud project ID. location (str): The Dataform repository's location. repository_name (str): The name of the Dataform repository. file_path (str): The path to the file within the repository. query_variables (dict): A dictionary for variable replacement (optional). Returns: str: The file's contents if successful, otherwise None. """ credentials.refresh(Request()) headers = {"Authorization": f"Bearer {credentials.token}"} url = (f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/" f"locations/{location}/repositories/{repository_name}:" f"readFile?path={file_path}") response = requests.get(url, headers=headers) if response.status_code == 200: file_contents = base64.b64decode(response.json()["contents"]).decode('utf-8').lstrip("-n") if query_variables: file_contents = replace_variables(file_contents, query_variables) if file_contents.startswith("config"): file_contents = file_contents.split("\n", 3)[3] return file_contents else: error_message = f"Dataform API request failed. Status code:{response.status_code}" print(error_message) print(response.text) raise Exception(error_message) def execute_query_or_get_status(query_file, file_path, job_id=None): """Executes a BigQuery query (if job ID not provided) or gets the status of an existing query. Args: query_file (str): The Dataform query to execute. job_id (str, optional): The ID of an existing BigQuery job. Defaults to None. Returns: str: The final state of the query job ('DONE', 'FAILED', etc.) or the query job ID if the query times out. """ client = bigquery.Client(project=BIGQUERY_PROJECT) if job_id: query_job = client.get_job(job_id) print(f"Checking status of existing job: {job_id}") if query_job.done(): if query_job.error_result: raise BadRequest(query_job.error_result) return query_job.state else: print(f"Query still running in state:{str(query_job.state)}") return query_job.state else: job_id = f"aef_{transform_string(file_path)}_{uuid.uuid4()}" job_config = bigquery.QueryJobConfig( priority=bigquery.QueryPriority.BATCH ) query_job = client.query(query=query_file, job_config=job_config, job_id=job_id) print(f"New query started. Job ID: {query_job.job_id}") return query_job.job_id def transform_string(text): """ Transforms a string by removing non-alphanumeric characters (except spaces and hyphens) and replacing spaces with underscores, then trims any leading or trailing underscores or hyphens. Args: text (str): The input string to transform. Returns: str: The transformed string. """ temp_text = re.sub(r"[^\w\s-]", " ", text) temp_text = re.sub(r"\s+", "_", temp_text) transformed_text = temp_text.strip("_-") return transformed_text def replace_variables(file_contents, query_variables): """ Replaces variables in a string with their corresponding values from a dictionary. Args: file_contents (str): The string containing the variables to be replaced. query_variables (dict): A dictionary mapping variable names to their values. Returns: str: The string with the variables replaced. """ for key, value in query_variables.items(): file_contents = file_contents.replace(key, f"'{value}'") return file_contents