def main()

in functions/data-processing-engines/dataform-tag-executor/main.py [0:0]


def main(request):
    """
    Main function, likely triggered by an HTTP request. Extracts parameters, reads a repository from
    Dataform, executes the file's contents as a BigQuery query, and reports the result status or job ID.

    Args:
        request: The incoming HTTP request object.

    Returns:
        str: The status of the query execution or the job ID (if asynchronous).
    """

    request_json = request.get_json(silent=True)
    print("event:" + str(request_json))

    try:
        job_name = request_json.get('job_name', None)
        workflow_name = request_json.get('workflow_name', None)

        jobs_definitions_bucket = request_json.get("workflow_properties", {}).get("jobs_definitions_bucket")
        repository_name = None
        tags = None
        branch = None
        dataform_location = None
        dataform_project_id = None

        if jobs_definitions_bucket:
            extracted_params = extract_params(
                bucket_name=jobs_definitions_bucket,
                job_name=job_name,
                function_name=function_name
            )
            repository_name = extracted_params.get("repository_name")
            tags = extracted_params.get("tags")
            branch = extracted_params.get("branch")
            dataform_location = extracted_params.get("dataform_location")
            dataform_project_id = extracted_params.get("dataform_project_id")

        job_id = request_json.get('job_id', None)
        query_variables = request_json.get('query_variables', None)

        status_or_job_id = run_repo_or_get_status(job_id, gcp_project=dataform_project_id, location=dataform_location,
                                                  repo_name=repository_name, tags=tags, branch=branch,
                                                  query_variables=query_variables)

        if status_or_job_id.startswith('aef_'):
            print(f"Running Query, track it with Job ID: {status_or_job_id}")
        else:
            print(f"Query finished with status: {status_or_job_id}")

        return status_or_job_id
    except Exception as error:
        err_message = "Exception: " + repr(error)
        response = {
            "error": error.__class__.__name__,
            "message": repr(err_message)
        }
        return response