def run_dataflow_job()

in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]


def run_dataflow_job(dataflow_job_name, job_name, request_json):
    extracted_params = extract_params(
        bucket_name=request_json.get("workflow_properties").get("jobs_definitions_bucket"),
        job_name=job_name,
        function_name=function_name
    )

    dataflow_location = extracted_params.get("dataflow_location")
    dataflow_project = extracted_params.get("project_id")
    dataflow_template_name = extracted_params.get("dataflow_template_name")
    dataflow_temp_bucket = extracted_params.get("dataflow_temp_bucket")
    dataflow_job_params = extracted_params.get("dataflow_job_params")
    dataflow_max_workers = extracted_params.get("dataflow_max_workers")
    network = extracted_params.get("network")
    subnetwork = extracted_params.get("subnetwork")
    dataflow_template_version = extracted_params.get("dataflow_template_version")

    gcs_path = "gs://dataflow-templates-{region}/{version}/flex/{template}".format(region=dataflow_location,
                                                                                   version=dataflow_template_version,
                                                                                   template=dataflow_template_name)
    body = {
        "launchParameter": {
            "jobName": dataflow_job_name,
            "parameters": dataflow_job_params,
            "containerSpecGcsPath": gcs_path,
            "environment": {"tempLocation": "gs://{bucket}/dataflow/temp".format(bucket=dataflow_temp_bucket),
                            "maxWorkers": str(dataflow_max_workers),
                            "network": str(network),
                            "subnetwork": str(subnetwork)}
        }
    }

    request = service.projects().locations().flexTemplates().launch(
        projectId=dataflow_project,
        location=dataflow_location,
        body=body
    )
    response = request.execute()
    return "aef_" + response.get("job").get("id")