tools/dataproc-job-metric-utility/src/collect.py (246 lines of code) (raw):

""" A utility to collect dataproc job metrics over a given timeframe. """ from datetime import timedelta, datetime import json import argparse import logging as logger from google.cloud import dataproc_v1, compute_v1, bigquery, storage from google.api_core.exceptions import NotFound import requests import google.auth.transport.requests from google.protobuf.json_format import MessageToDict def to_camel_case(key_str: str): """Converts a snake_case string to camelCase.""" key_str = key_str.replace("-", "_").replace(" ", "_") # Normalize delimiters components = key_str.split("_") return components[0] + "".join(x.title() for x in components[1:]) def clean_up_keys(data): """Creates a new dictionary with camelCase keys from a given dictionary.""" camel_case_dict = {} for key, value in data.items(): if "HiveServer2" in key: key = "HiveServer2" if isinstance(value, dict): # Recursively handle nested dictionaries value = clean_up_keys(value) camel_case_dict[to_camel_case(key)] = value return camel_case_dict def clean_up_values(data: dict): """Replaces empty dictionaries with None in a dictionary, including nested ones.""" for key, value in data.items(): if isinstance(value, dict): clean_up_values(value) if not value: data[key] = ( None # BQ: Unsupported empty struct type for field ) if "properties" in key: data[key] = str(data[key]) # wrap properties maps as strings return data def upload_json_to_gcs(bucket_name: str, blob_name: str, data: dict): """ Upload json data to a GCS location. """ logger.info(f"Uploading results to gs://{bucket_name}/{blob_name}") storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) ndjson_string = "\n".join(json.dumps(item) for item in data) # Upload the string to GCS blob.upload_from_string(ndjson_string, content_type="application/json") def load_metrics_to_bigquery( bq_dataset: str, bq_table: str, bucket_name: str, blob_name: str, project_id: str, kms_key_name: str = None, ): """ Load a GCS object containing dataproc metrics into a BQ table """ logger.info( f"Loading results to BigQuery: {project_id}.{bq_dataset}.{bq_table}") bq_client = bigquery.Client(project=project_id) dataset_ref = bq_client.dataset(bq_dataset) table_ref = dataset_ref.table(bq_table) if kms_key_name: job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, autodetect=True, create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, destination_encryption_configuration=bigquery. EncryptionConfiguration(kms_key_name=kms_key_name), ) else: job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, autodetect=True, create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, ) load_job = bq_client.load_table_from_uri("gs://" + bucket_name + "/" + blob_name, table_ref, job_config=job_config) load_job.result() # Waits for the job to complete def collect_dataproc_job_metrics(project_id, region, hours, bucket_name, blob_name, bq_dataset, bq_table): """ Uses the Dataproc Job, Dataproc Cluster, Compute, and GCS APIs to collect dataproc job metrics for all jobs that have ran in the last <user-provided> hours. Uploads results to GCS. """ # ------------------------------------------------- # Begin by getting all dataproc jobs in last x hours # ------------------------------------------------- logger.info(f"Retrieving all dataproc jobs in the last {hours} hour(s).") creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) cred_token = creds.token dataproc_job_client = dataproc_v1.JobControllerClient( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com"}) dataproc_cluster_client = dataproc_v1.ClusterControllerClient( client_options={ "api_endpoint": f"{region}-dataproc.googleapis.com:443" }) # ------------------------------------------------- # Get Jobs that have started recently # ------------------------------------------------- dataproc_jobs = dataproc_job_client.list_jobs(request={ "project_id": project_id, "region": region }) now = datetime.now() min_range = datetime.timestamp(now - timedelta(hours=hours)) timeframed_jobs = [] all_jobs = [] for dataproc_job in dataproc_jobs: all_jobs.append(dataproc_job) job_start = datetime.timestamp( dataproc_job.status_history[0].state_start_time) if job_start and job_start > min_range: timeframed_jobs.append(dataproc_job) all_job_counts = str(len(all_jobs)) timeframed_jobs_counts = str(len(timeframed_jobs)) print(f"All Jobs: {all_job_counts}") print(f"Jobs in the last {hours} hours: {timeframed_jobs_counts}") all_metrics = [] dataproc_job_config = {} dataproc_cluster_config = {} primary_machine_type_config = {} secondary_machine_type_config = {} yarn_metrics = {} count = 0 for dataproc_job in timeframed_jobs: count += 1 print("Progress: " + str(round(count / len(timeframed_jobs) * 100, 2)) + "%") dataproc_job_config = MessageToDict(dataproc_job._pb) dataproc_cluster_name = dataproc_job_config.get("placement").get( "clusterName") try: dataproc_cluster = dataproc_cluster_client.get_cluster( project_id=project_id, region=region, cluster_name=dataproc_cluster_name, ) dataproc_cluster_config = MessageToDict(dataproc_cluster._pb) # ------------------------------------------------- # Collect metrics for cluster machine types # ------------------------------------------------- compute_client = compute_v1.MachineTypesClient() primary_machine_type = str( dataproc_cluster.config.worker_config.machine_type_uri).rsplit( "/", 1)[-1] primary_machine_type_config = MessageToDict( compute_client.get( project=project_id, zone=region + "-a", machine_type=primary_machine_type, )._pb) secondary_worker_count = int( dataproc_cluster.config.secondary_worker_config.num_instances) secondary_machine_type_config = {} if secondary_worker_count > 0: secondary_machine_type = str( dataproc_cluster.config.secondary_worker_config. machine_type_uri).rsplit("/", 1)[-1] secondary_machine_type_config = MessageToDict( compute_client.get( project=project_id, zone=region + "-a", machine_type=secondary_machine_type, )._pb) except NotFound: logger.info("Cluster not found for job id.") dataproc_cluster_config = None # ------------------------------------------------- # Collect YARN metrics for Job if Cluster exists # ------------------------------------------------- yarn_metrics = {} if dataproc_cluster_config: if dataproc_job.yarn_applications: yarn_endpoint = dataproc_cluster.config.endpoint_config.http_ports.get( "YARN ResourceManager") application_id = dataproc_job.yarn_applications[ 0].tracking_url.split("/")[-2] base_url = f"{yarn_endpoint}ws/v1/cluster/apps/{application_id}" try: headers = {"Proxy-Authorization": f"Bearer {cred_token}"} response = requests.get(url=base_url, headers=headers) response.raise_for_status( ) # Raise an exception for HTTP errors yarn_metrics = response.json().get("app") except requests.exceptions.RequestException as e: print(str(e)) continue job_metrics = { "dataproc_job_config": dataproc_job_config, "dataproc_cluster_config": dataproc_cluster_config, "primary_machine_config": primary_machine_type_config, "secondary_machine_config": secondary_machine_type_config, "yarn_metrics": yarn_metrics, } print(job_metrics) job_metrics = clean_up_keys(job_metrics) job_metrics = clean_up_values(job_metrics) all_metrics.append(job_metrics) if all_metrics: # ------------------------------------------------- # Upload results to GCS # ------------------------------------------------- upload_json_to_gcs(bucket_name=bucket_name, blob_name=blob_name, data=all_metrics) # ------------------------------------------------- # Load results into BigQuery # ------------------------------------------------- load_metrics_to_bigquery( project_id=project_id, bq_dataset=bq_dataset, bq_table=bq_table, bucket_name=bucket_name, blob_name=blob_name, ) logger.info("Metric collection complete.") else: logger.error("No Dataproc jobs found in the specified timeframe.") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Collect Dataproc job metrics and store them in BigQuery.") # Required Arguments parser.add_argument("--project_id", type=str, required=True, help="Google Cloud project ID") parser.add_argument( "--region", type=str, required=True, help="Cloud region where the Dataproc jobs ran", ) parser.add_argument( "--bq_dataset", type=str, required=True, help="BigQuery dataset to store metrics", ) parser.add_argument( "--bucket_name", type=str, required=True, help="GCS bucket to store metrics data", ) parser.add_argument("--bq_table", type=str, required=True, help="BigQuery table to store metrics") # Optional Arguments (with defaults) parser.add_argument( "--hours", type=int, default=24, help="Number of hours to look back for job metrics (default: 24)", ) parser.add_argument( "--blob_name", type=str, default="dataproc_metrics.json", help="Name of the GCS metrics blob", ) args = parser.parse_args() # Call the function with the parsed arguments collect_dataproc_job_metrics( args.project_id, args.region, args.hours, args.bucket_name, args.blob_name, args.bq_dataset, args.bq_table, )