def get_dataflow_state()

in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]


def get_dataflow_state(job_id, job_name, request_json):
    extracted_params = extract_params(
        bucket_name=request_json.get("workflow_properties").get("jobs_definitions_bucket"),
        job_name=job_name,
        function_name=function_name
    )

    dataflow_location = extracted_params.get("dataflow_location")
    dataflow_project = extracted_params.get("project_id")

    get_job_request = service.projects().locations().jobs().get(location=dataflow_location, projectId=dataflow_project,
                                                                jobId=re.sub(r"^aef_", "", job_id))

    print("Getting status execute ")
    job_status = get_job_request.execute()

    print(f"Job status: {str(job_status)}")

    return job_status['currentState']