def create_batch_job()

in functions/data-processing-engines/dataproc-serverless-job-executor/main.py [0:0]


def create_batch_job(workflow_name, job_name, query_variables, workflow_properties, extracted_params):
    """
    calls a dataproc serverless job.

    Args:
        request_json (dict) : event dictionary
    Returns:
        str: Id or status of the dataproc serverless batch job
    """

    dataproc_serverless_project_id = extracted_params.get('dataproc_serverless_project_id')
    dataproc_serverless_region = extracted_params.get('dataproc_serverless_region')
    jar_file_location = extracted_params.get('jar_file_location')
    spark_history_server_cluster = extracted_params.get('spark_history_server_cluster')
    spark_app_main_class = extracted_params.get('spark_app_main_class')
    spark_args = extracted_params.get('spark_args')
    dataproc_serverless_runtime_version = extracted_params.get('dataproc_serverless_runtime_version')
    dataproc_service_account = extracted_params.get('dataproc_service_account')
    spark_app_properties = extracted_params.get('spark_app_properties')
    subnetwork = extracted_params.get("subnetwork")

    if isinstance(spark_app_properties, str):
        spark_app_properties = json.loads(spark_app_properties)

    credentials.refresh(Request())
    headers = {"Authorization": f"Bearer {credentials.token}"}

    curr_dt = datetime.datetime.now()
    timestamp = int(round(curr_dt.timestamp()))

    params = {
        "spark_batch": {
            "jar_file_uris": [jar_file_location],
            "main_class": spark_app_main_class,
            "args": spark_args
        },
        "runtime_config": {
            "version": dataproc_serverless_runtime_version,
            "properties": spark_app_properties
        },
        "environment_config": {
            "execution_config": {
                "service_account": dataproc_service_account,
                "subnetwork_uri": f"projects/{dataproc_serverless_project_id}/{subnetwork}"
            }
        }
    }

    # Check if spark_history_server_cluster is present and not null
    if spark_history_server_cluster:
        spark_history_server_cluster_path = f"projects/{dataproc_serverless_project_id}/regions/{dataproc_serverless_region}/clusters/{spark_history_server_cluster}"
        params["environment_config"]["peripherals_config"] = {
            "spark_history_server_config": {
                "dataproc_cluster": spark_history_server_cluster_path
            },
        }

    print(params)

    batch_id = f"aef-{timestamp}"

    url = (f"https://dataproc.googleapis.com/v1/projects/{dataproc_serverless_project_id}/"
           f"locations/{dataproc_serverless_region}/batches?batchId={batch_id}")

    response = requests.post(url, json=params, headers=headers)

    if response.status_code == 200:
        print("response::" + str(response))
        return batch_id
    else:
        error_message = f"Dataproc API CREATE request failed. Status code:{response.status_code}"
        print(error_message)
        print(response.text)
        raise Exception(error_message)