def execute_query_or_get_status()

in functions/data-processing-engines/bq-saved-query-executor/main.py [0:0]


def execute_query_or_get_status(query_file, file_path, job_id=None):
    """Executes a BigQuery query (if job ID not provided) or gets the status of an existing query.
    Args:
        query_file (str): The Dataform query to execute.
        job_id (str, optional): The ID of an existing BigQuery job. Defaults to None.
    Returns:
        str: The final state of the query job ('DONE', 'FAILED', etc.) or the query job ID if the query times out.
    """
    client = bigquery.Client(project=BIGQUERY_PROJECT)
    if job_id:
        query_job = client.get_job(job_id)
        print(f"Checking status of existing job: {job_id}")
        if query_job.done():
            if query_job.error_result:
                raise BadRequest(query_job.error_result)
            return query_job.state
        else:
            print(f"Query still running in state:{str(query_job.state)}")
            return query_job.state
    else:
        job_id = f"aef_{transform_string(file_path)}_{uuid.uuid4()}"
        job_config = bigquery.QueryJobConfig(
            priority=bigquery.QueryPriority.BATCH
        )
        query_job = client.query(query=query_file, job_config=job_config, job_id=job_id)
        print(f"New query started. Job ID: {query_job.job_id}")
        return query_job.job_id