def main()

in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]


def main(request):
    """
    Cloud Function entry point for handling Dataflow job requests.

    This function processes an incoming HTTP request, extracting details about a Dataflow Flex Template job.
    It either launches a new Dataflow job or retrieves the status of an existing job based on the request.

    Args:
        request: The incoming HTTP request object.  Expected to contain a JSON payload with the following keys:
            - workflow_properties: A dictionary containing Dataflow job configuration:
                - dataflow_location: The GCP region for the Dataflow job.
                - dataflow_project_id: The GCP project ID for the Dataflow job.
                - dataflow_template_gcs_path: The GCS path to the Dataflow Flex Template.
                - dataflow_job_name: The name to assign to the Dataflow job.
                - dataflow_job_params: (Optional) A dictionary of parameters for the Dataflow job.
            - workflow_name: The name of the workflow triggering the Dataflow job.
            - job_name:  A unique identifier for the job within the workflow.
            - job_id: (Optional) The ID of an existing Dataflow job (if checking status).

    Returns:
        str:
            - If launching a new job: The Dataflow job ID (prefixed with "aef_").
            - If getting job status: The current state of the Dataflow job (e.g., "JOB_STATE_RUNNING").
            - If an error occurs: A JSON object with error details.
    """
    request_json = request.get_json(silent=True)
    print("event:" + str(request_json))

    try:
        location = request_json.get('workflow_properties').get('location', None)
        project_id = request_json.get('workflow_properties').get('project_id', None)

        job_name = request_json.get("job_name", "")
        dataflow_job_name = re.sub(r"^\d+", "", re.sub(r"[^a-z0-9+]", "", request_json.get("job_name", "")))
        dataflow_job_name = re.sub(r"^\d+", "", dataflow_job_name)

        job_id = request_json.get('job_id', None)
        workflow_name = request_json.get('workflow_name', None)

        status_or_job_id = run_dataflow_job_or_get_status(job_id,
                                                          dataflow_job_name=dataflow_job_name,
                                                          job_name=job_name,
                                                          request_json=request_json)

        if status_or_job_id.startswith('aef_'):
            print(f"Running Dataflow Job, track it with Job ID: {status_or_job_id}")
        else:
            print(f"Dataflow Job with status: {status_or_job_id}")

        return status_or_job_id
    except Exception as error:
        err_message = "Exception: " + repr(error)
        response = {
            "error": error.__class__.__name__,
            "message": repr(err_message)
        }
        return response