in dataproc/snippets/submit_job_to_cluster.py [0:0]
def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
# Create the cluster client.
cluster_client = dataproc_v1.ClusterControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
# Create the cluster config.
cluster = {
"project_id": project_id,
"cluster_name": cluster_name,
"config": {
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
},
}
# Create the cluster.
operation = cluster_client.create_cluster(
request={"project_id": project_id, "region": region, "cluster": cluster}
)
result = operation.result()
print(f"Cluster created successfully: {result.cluster_name}")
# [END dataproc_create_cluster]
spark_file, spark_filename = get_pyspark_file(pyspark_file)
upload_pyspark_file(project_id, gcs_bucket, spark_filename, spark_file)
# [START dataproc_submit_job]
# Create the job client.
job_client = dataproc_v1.JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
# Create the job config.
job = {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_bytes()
.decode("utf-8")
)
print(f"Job finished successfully: {output}\r\n")
# [END dataproc_submit_job]
# [START dataproc_delete_cluster]
# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
request={
"project_id": project_id,
"region": region,
"cluster_name": cluster_name,
}
)
operation.result()
print(f"Cluster {cluster_name} successfully deleted.")