def collect_dataproc_job_metrics()

in tools/dataproc-job-metric-utility/src/collect.py [0:0]


def collect_dataproc_job_metrics(project_id, region, hours, bucket_name,
                                 blob_name, bq_dataset, bq_table):
    """
    Uses the Dataproc Job, Dataproc Cluster, Compute, and GCS APIs to collect
    dataproc job metrics for all jobs that have ran in the last <user-provided>
    hours. Uploads results to GCS.
    """

    # -------------------------------------------------
    # Begin by getting all dataproc jobs in last x hours
    # -------------------------------------------------
    logger.info(f"Retrieving all dataproc jobs in the last {hours} hour(s).")

    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    cred_token = creds.token

    dataproc_job_client = dataproc_v1.JobControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com"})
    dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={
            "api_endpoint": f"{region}-dataproc.googleapis.com:443"
        })

    # -------------------------------------------------
    # Get Jobs that have started recently
    # -------------------------------------------------

    dataproc_jobs = dataproc_job_client.list_jobs(request={
        "project_id": project_id,
        "region": region
    })

    now = datetime.now()
    min_range = datetime.timestamp(now - timedelta(hours=hours))

    timeframed_jobs = []
    all_jobs = []
    for dataproc_job in dataproc_jobs:
        all_jobs.append(dataproc_job)
        job_start = datetime.timestamp(
            dataproc_job.status_history[0].state_start_time)
        if job_start and job_start > min_range:
            timeframed_jobs.append(dataproc_job)

    all_job_counts = str(len(all_jobs))
    timeframed_jobs_counts = str(len(timeframed_jobs))
    print(f"All Jobs: {all_job_counts}")
    print(f"Jobs in the last {hours} hours: {timeframed_jobs_counts}")

    all_metrics = []
    dataproc_job_config = {}
    dataproc_cluster_config = {}
    primary_machine_type_config = {}
    secondary_machine_type_config = {}
    yarn_metrics = {}
    count = 0
    for dataproc_job in timeframed_jobs:
        count += 1
        print("Progress: " + str(round(count / len(timeframed_jobs) * 100, 2)) +
              "%")
        dataproc_job_config = MessageToDict(dataproc_job._pb)
        dataproc_cluster_name = dataproc_job_config.get("placement").get(
            "clusterName")

        try:
            dataproc_cluster = dataproc_cluster_client.get_cluster(
                project_id=project_id,
                region=region,
                cluster_name=dataproc_cluster_name,
            )
            dataproc_cluster_config = MessageToDict(dataproc_cluster._pb)

            # -------------------------------------------------
            # Collect metrics for cluster machine types
            # -------------------------------------------------

            compute_client = compute_v1.MachineTypesClient()

            primary_machine_type = str(
                dataproc_cluster.config.worker_config.machine_type_uri).rsplit(
                    "/", 1)[-1]
            primary_machine_type_config = MessageToDict(
                compute_client.get(
                    project=project_id,
                    zone=region + "-a",
                    machine_type=primary_machine_type,
                )._pb)

            secondary_worker_count = int(
                dataproc_cluster.config.secondary_worker_config.num_instances)
            secondary_machine_type_config = {}
            if secondary_worker_count > 0:
                secondary_machine_type = str(
                    dataproc_cluster.config.secondary_worker_config.
                    machine_type_uri).rsplit("/", 1)[-1]
                secondary_machine_type_config = MessageToDict(
                    compute_client.get(
                        project=project_id,
                        zone=region + "-a",
                        machine_type=secondary_machine_type,
                    )._pb)
        except NotFound:
            logger.info("Cluster not found for job id.")
            dataproc_cluster_config = None

        # -------------------------------------------------
        # Collect YARN metrics for Job if Cluster exists
        # -------------------------------------------------
        yarn_metrics = {}
        if dataproc_cluster_config:
            if dataproc_job.yarn_applications:
                yarn_endpoint = dataproc_cluster.config.endpoint_config.http_ports.get(
                    "YARN ResourceManager")
                application_id = dataproc_job.yarn_applications[
                    0].tracking_url.split("/")[-2]

                base_url = f"{yarn_endpoint}ws/v1/cluster/apps/{application_id}"
                try:
                    headers = {"Proxy-Authorization": f"Bearer {cred_token}"}
                    response = requests.get(url=base_url, headers=headers)
                    response.raise_for_status(
                    )  # Raise an exception for HTTP errors
                    yarn_metrics = response.json().get("app")

                except requests.exceptions.RequestException as e:
                    print(str(e))
                    continue

        job_metrics = {
            "dataproc_job_config": dataproc_job_config,
            "dataproc_cluster_config": dataproc_cluster_config,
            "primary_machine_config": primary_machine_type_config,
            "secondary_machine_config": secondary_machine_type_config,
            "yarn_metrics": yarn_metrics,
        }
        print(job_metrics)
        job_metrics = clean_up_keys(job_metrics)
        job_metrics = clean_up_values(job_metrics)
        all_metrics.append(job_metrics)

    if all_metrics:

        # -------------------------------------------------
        # Upload results to GCS
        # -------------------------------------------------
        upload_json_to_gcs(bucket_name=bucket_name,
                           blob_name=blob_name,
                           data=all_metrics)

        # -------------------------------------------------
        # Load results into BigQuery
        # -------------------------------------------------
        load_metrics_to_bigquery(
            project_id=project_id,
            bq_dataset=bq_dataset,
            bq_table=bq_table,
            bucket_name=bucket_name,
            blob_name=blob_name,
        )

        logger.info("Metric collection complete.")
    else:
        logger.error("No Dataproc jobs found in the specified timeframe.")