in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]
def get_dataflow_state(job_id, job_name, request_json):
extracted_params = extract_params(
bucket_name=request_json.get("workflow_properties").get("jobs_definitions_bucket"),
job_name=job_name,
function_name=function_name
)
dataflow_location = extracted_params.get("dataflow_location")
dataflow_project = extracted_params.get("project_id")
get_job_request = service.projects().locations().jobs().get(location=dataflow_location, projectId=dataflow_project,
jobId=re.sub(r"^aef_", "", job_id))
print("Getting status execute ")
job_status = get_job_request.execute()
print(f"Job status: {str(job_status)}")
return job_status['currentState']