in functions/data-processing-engines/dataproc-serverless-job-executor/main.py [0:0]
def create_batch_job(workflow_name, job_name, query_variables, workflow_properties, extracted_params):
"""
calls a dataproc serverless job.
Args:
request_json (dict) : event dictionary
Returns:
str: Id or status of the dataproc serverless batch job
"""
dataproc_serverless_project_id = extracted_params.get('dataproc_serverless_project_id')
dataproc_serverless_region = extracted_params.get('dataproc_serverless_region')
jar_file_location = extracted_params.get('jar_file_location')
spark_history_server_cluster = extracted_params.get('spark_history_server_cluster')
spark_app_main_class = extracted_params.get('spark_app_main_class')
spark_args = extracted_params.get('spark_args')
dataproc_serverless_runtime_version = extracted_params.get('dataproc_serverless_runtime_version')
dataproc_service_account = extracted_params.get('dataproc_service_account')
spark_app_properties = extracted_params.get('spark_app_properties')
subnetwork = extracted_params.get("subnetwork")
if isinstance(spark_app_properties, str):
spark_app_properties = json.loads(spark_app_properties)
credentials.refresh(Request())
headers = {"Authorization": f"Bearer {credentials.token}"}
curr_dt = datetime.datetime.now()
timestamp = int(round(curr_dt.timestamp()))
params = {
"spark_batch": {
"jar_file_uris": [jar_file_location],
"main_class": spark_app_main_class,
"args": spark_args
},
"runtime_config": {
"version": dataproc_serverless_runtime_version,
"properties": spark_app_properties
},
"environment_config": {
"execution_config": {
"service_account": dataproc_service_account,
"subnetwork_uri": f"projects/{dataproc_serverless_project_id}/{subnetwork}"
}
}
}
# Check if spark_history_server_cluster is present and not null
if spark_history_server_cluster:
spark_history_server_cluster_path = f"projects/{dataproc_serverless_project_id}/regions/{dataproc_serverless_region}/clusters/{spark_history_server_cluster}"
params["environment_config"]["peripherals_config"] = {
"spark_history_server_config": {
"dataproc_cluster": spark_history_server_cluster_path
},
}
print(params)
batch_id = f"aef-{timestamp}"
url = (f"https://dataproc.googleapis.com/v1/projects/{dataproc_serverless_project_id}/"
f"locations/{dataproc_serverless_region}/batches?batchId={batch_id}")
response = requests.post(url, json=params, headers=headers)
if response.status_code == 200:
print("response::" + str(response))
return batch_id
else:
error_message = f"Dataproc API CREATE request failed. Status code:{response.status_code}"
print(error_message)
print(response.text)
raise Exception(error_message)