in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]
def main(request):
"""
Cloud Function entry point for handling Dataflow job requests.
This function processes an incoming HTTP request, extracting details about a Dataflow Flex Template job.
It either launches a new Dataflow job or retrieves the status of an existing job based on the request.
Args:
request: The incoming HTTP request object. Expected to contain a JSON payload with the following keys:
- workflow_properties: A dictionary containing Dataflow job configuration:
- dataflow_location: The GCP region for the Dataflow job.
- dataflow_project_id: The GCP project ID for the Dataflow job.
- dataflow_template_gcs_path: The GCS path to the Dataflow Flex Template.
- dataflow_job_name: The name to assign to the Dataflow job.
- dataflow_job_params: (Optional) A dictionary of parameters for the Dataflow job.
- workflow_name: The name of the workflow triggering the Dataflow job.
- job_name: A unique identifier for the job within the workflow.
- job_id: (Optional) The ID of an existing Dataflow job (if checking status).
Returns:
str:
- If launching a new job: The Dataflow job ID (prefixed with "aef_").
- If getting job status: The current state of the Dataflow job (e.g., "JOB_STATE_RUNNING").
- If an error occurs: A JSON object with error details.
"""
request_json = request.get_json(silent=True)
print("event:" + str(request_json))
try:
location = request_json.get('workflow_properties').get('location', None)
project_id = request_json.get('workflow_properties').get('project_id', None)
job_name = request_json.get("job_name", "")
dataflow_job_name = re.sub(r"^\d+", "", re.sub(r"[^a-z0-9+]", "", request_json.get("job_name", "")))
dataflow_job_name = re.sub(r"^\d+", "", dataflow_job_name)
job_id = request_json.get('job_id', None)
workflow_name = request_json.get('workflow_name', None)
status_or_job_id = run_dataflow_job_or_get_status(job_id,
dataflow_job_name=dataflow_job_name,
job_name=job_name,
request_json=request_json)
if status_or_job_id.startswith('aef_'):
print(f"Running Dataflow Job, track it with Job ID: {status_or_job_id}")
else:
print(f"Dataflow Job with status: {status_or_job_id}")
return status_or_job_id
except Exception as error:
err_message = "Exception: " + repr(error)
response = {
"error": error.__class__.__name__,
"message": repr(err_message)
}
return response