dataproc/snippets/submit_job_to_cluster.py (122 lines of code) (raw):

#!/usr/bin/env python # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # [START dataproc_quickstart] """ Command-line program to create a Dataproc cluster, run a PySpark job located in Cloud Storage on the cluster, then delete the cluster after the job completes. Usage: python submit_job_to_cluster --project_id <PROJECT_ID> --region <REGION> \ --cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH> """ import argparse import os import re from google.cloud import dataproc_v1 from google.cloud import storage DEFAULT_FILENAME = "pyspark_sort.py" waiting_callback = False def get_pyspark_file(pyspark_file=None): if pyspark_file: f = open(pyspark_file, "rb") return f, os.path.basename(pyspark_file) else: """Gets the PySpark file from current directory.""" current_dir = os.path.dirname(os.path.abspath(__file__)) f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb") return f, DEFAULT_FILENAME def get_region_from_zone(zone): try: region_as_list = zone.split("-")[:-1] return "-".join(region_as_list) except (AttributeError, IndexError, ValueError): raise ValueError("Invalid zone provided, please check your input.") def upload_pyspark_file(project, bucket_name, filename, spark_file): """Uploads the PySpark file in this directory to the configured input bucket.""" print("Uploading pyspark file to Cloud Storage.") client = storage.Client(project=project) bucket = client.get_bucket(bucket_name) blob = bucket.blob(filename) blob.upload_from_file(spark_file) def download_output(project, cluster_id, output_bucket, job_id): """Downloads the output file from Cloud Storage and returns it as a string.""" print("Downloading output file.") client = storage.Client(project=project) bucket = client.get_bucket(output_bucket) output_blob = ( "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format( cluster_id, job_id ) ) return bucket.blob(output_blob).download_as_bytes().decode("utf-8") # [START dataproc_create_cluster] def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file): # Create the cluster client. cluster_client = dataproc_v1.ClusterControllerClient( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} ) # Create the cluster config. cluster = { "project_id": project_id, "cluster_name": cluster_name, "config": { "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, }, } # Create the cluster. operation = cluster_client.create_cluster( request={"project_id": project_id, "region": region, "cluster": cluster} ) result = operation.result() print(f"Cluster created successfully: {result.cluster_name}") # [END dataproc_create_cluster] spark_file, spark_filename = get_pyspark_file(pyspark_file) upload_pyspark_file(project_id, gcs_bucket, spark_filename, spark_file) # [START dataproc_submit_job] # Create the job client. job_client = dataproc_v1.JobControllerClient( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} ) # Create the job config. job = { "placement": {"cluster_name": cluster_name}, "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"}, } operation = job_client.submit_job_as_operation( request={"project_id": project_id, "region": region, "job": job} ) response = operation.result() # Dataproc job output is saved to the Cloud Storage bucket # allocated to the job. Use regex to obtain the bucket and blob info. matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) output = ( storage.Client() .get_bucket(matches.group(1)) .blob(f"{matches.group(2)}.000000000") .download_as_bytes() .decode("utf-8") ) print(f"Job finished successfully: {output}\r\n") # [END dataproc_submit_job] # [START dataproc_delete_cluster] # Delete the cluster once the job has terminated. operation = cluster_client.delete_cluster( request={ "project_id": project_id, "region": region, "cluster_name": cluster_name, } ) operation.result() print(f"Cluster {cluster_name} successfully deleted.") # [END dataproc_delete_cluster] if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--project_id", type=str, required=True, help="Project to use for creating resources.", ) parser.add_argument( "--region", type=str, required=True, help="Region where the resources should live.", ) parser.add_argument( "--cluster_name", type=str, required=True, help="Name to use for creating a cluster.", ) parser.add_argument( "--gcs_bucket", help="Bucket to upload Pyspark file to", required=True ) parser.add_argument( "--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py" ) args = parser.parse_args() quickstart( args.project_id, args.region, args.cluster_name, args.gcs_bucket, args.pyspark_file, ) # [END dataproc_quickstart]