cicd-deployers/bigquery_ddl_runner.py (70 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import collections from google.cloud import bigquery from google.cloud import storage import sys def run_sql_queries_from_gcs(project_id, location, bucket, ddl_project_id, ddl_dataset_id, ddl_data_bucket_name, ddl_connection_name): """Searches for SQL files in a GCS bucket and runs them in BigQuery. Args: bucket_name (str): Name of the GCS bucket. project_id (str): Google Cloud project ID. """ bigquery_client = bigquery.Client(project=project_id) storage_client = storage.Client(project=project_id) bucket = storage_client.get_bucket(bucket) blobs = bucket.list_blobs(prefix="", delimiter="/") for blob in blobs: if blob.name.endswith(".sql"): file_content = blob.download_as_string().decode("utf-8") # Replace variables updated_query = replace_variables_in_query(file_content, ddl_project_id, ddl_dataset_id, ddl_data_bucket_name, ddl_connection_name) # Create a query job configuration job_config = bigquery.QueryJobConfig() # Run the SQL query in BigQuery query_job = bigquery_client.query(updated_query, job_config=job_config) # Wait for the query job to complete and print results results = query_job.result() for row in results: print(row) def replace_variables_in_query(file_content, project_id, dataset_id, data_bucket_name, connection_name): """Replaces variables in a BigQuery query string. Args: file_content (str): The content of the query file. project_id (str): Google Cloud project ID. dataset_id (str): BigQuery dataset ID. data_bucket_name (str): Name of the GCS bucket. connection_name (str): Name of the BigQuery connection. Returns: str: The updated query string with replaced variables. """ updated_query = file_content.replace("${PROJECT_ID}", project_id) \ .replace("${DATASET_ID}", dataset_id) \ .replace("${DATA_BUCKET_NAME}", data_bucket_name) \ .replace("${CONNECTION_NAME}", connection_name) return updated_query def main(args: collections.abc.Sequence[str]) -> int: """The main function parses command-line arguments and calls the run_workflow function to execute the complete Dataform workflow. To run the script, provide the required command-line arguments: python intro.py --project_id your_project_id --location your_location --repository your_repo_name --dataset your_bq_dataset --branch your_branch """ parser = argparse.ArgumentParser(description="BigQuery DDLs defined in files in a GCS bucket runner") parser.add_argument("--project_id", type=str, required=True, help="The GCP project ID where the BigQuery client and storage client will be created.") parser.add_argument("--location", type=str, required=True, help="The location of the BigQuery client and storage client") parser.add_argument("--bucket", type=str, required=True, help="The bucket where there are DLL files to run") parser.add_argument("--ddl_project_id", type=str, required=True, help="The project ID that will be replaced. It should be defined in the .sql file like: {$PROJECT_ID}") parser.add_argument("--ddl_dataset_id", type=str, required=True, help="The dataset ID that will be replaced. It should be defined in the .sql file like: {$DATASET_ID}") parser.add_argument("--ddl_data_bucket_name", type=str, required=True, help="The bucket name that will be replaced. It should be defined in the .sql file like: {DATA_BUCKET_NAME}") parser.add_argument("--ddl_connection_name", type=str, required=True, help="The BigLake connection name that will be replaced. It should be defined in the .sql file like: {CONNECTION_NAME}") params = parser.parse_args(args) project_id = str(params.project_id) location = str(params.location) bucket = str(params.bucket) ddl_project_id = str(params.ddl_project_id) ddl_dataset_id = str(params.ddl_dataset_id) ddl_data_bucket_name = str(params.ddl_data_bucket_name) ddl_connection_name = str(params.ddl_connection_name) run_sql_queries_from_gcs(project_id=project_id, location=location, bucket=bucket, ddl_project_id=ddl_project_id, ddl_dataset_id=ddl_dataset_id, ddl_data_bucket_name=ddl_data_bucket_name, ddl_connection_name=ddl_connection_name) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))