in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]
def run_dataflow_job(dataflow_job_name, job_name, request_json):
extracted_params = extract_params(
bucket_name=request_json.get("workflow_properties").get("jobs_definitions_bucket"),
job_name=job_name,
function_name=function_name
)
dataflow_location = extracted_params.get("dataflow_location")
dataflow_project = extracted_params.get("project_id")
dataflow_template_name = extracted_params.get("dataflow_template_name")
dataflow_temp_bucket = extracted_params.get("dataflow_temp_bucket")
dataflow_job_params = extracted_params.get("dataflow_job_params")
dataflow_max_workers = extracted_params.get("dataflow_max_workers")
network = extracted_params.get("network")
subnetwork = extracted_params.get("subnetwork")
dataflow_template_version = extracted_params.get("dataflow_template_version")
gcs_path = "gs://dataflow-templates-{region}/{version}/flex/{template}".format(region=dataflow_location,
version=dataflow_template_version,
template=dataflow_template_name)
body = {
"launchParameter": {
"jobName": dataflow_job_name,
"parameters": dataflow_job_params,
"containerSpecGcsPath": gcs_path,
"environment": {"tempLocation": "gs://{bucket}/dataflow/temp".format(bucket=dataflow_temp_bucket),
"maxWorkers": str(dataflow_max_workers),
"network": str(network),
"subnetwork": str(subnetwork)}
}
}
request = service.projects().locations().flexTemplates().launch(
projectId=dataflow_project,
location=dataflow_location,
body=body
)
response = request.execute()
return "aef_" + response.get("job").get("id")