in tools/dataproc-job-metric-utility/src/collect.py [0:0]
def collect_dataproc_job_metrics(project_id, region, hours, bucket_name,
blob_name, bq_dataset, bq_table):
"""
Uses the Dataproc Job, Dataproc Cluster, Compute, and GCS APIs to collect
dataproc job metrics for all jobs that have ran in the last <user-provided>
hours. Uploads results to GCS.
"""
# -------------------------------------------------
# Begin by getting all dataproc jobs in last x hours
# -------------------------------------------------
logger.info(f"Retrieving all dataproc jobs in the last {hours} hour(s).")
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
cred_token = creds.token
dataproc_job_client = dataproc_v1.JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com"})
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
client_options={
"api_endpoint": f"{region}-dataproc.googleapis.com:443"
})
# -------------------------------------------------
# Get Jobs that have started recently
# -------------------------------------------------
dataproc_jobs = dataproc_job_client.list_jobs(request={
"project_id": project_id,
"region": region
})
now = datetime.now()
min_range = datetime.timestamp(now - timedelta(hours=hours))
timeframed_jobs = []
all_jobs = []
for dataproc_job in dataproc_jobs:
all_jobs.append(dataproc_job)
job_start = datetime.timestamp(
dataproc_job.status_history[0].state_start_time)
if job_start and job_start > min_range:
timeframed_jobs.append(dataproc_job)
all_job_counts = str(len(all_jobs))
timeframed_jobs_counts = str(len(timeframed_jobs))
print(f"All Jobs: {all_job_counts}")
print(f"Jobs in the last {hours} hours: {timeframed_jobs_counts}")
all_metrics = []
dataproc_job_config = {}
dataproc_cluster_config = {}
primary_machine_type_config = {}
secondary_machine_type_config = {}
yarn_metrics = {}
count = 0
for dataproc_job in timeframed_jobs:
count += 1
print("Progress: " + str(round(count / len(timeframed_jobs) * 100, 2)) +
"%")
dataproc_job_config = MessageToDict(dataproc_job._pb)
dataproc_cluster_name = dataproc_job_config.get("placement").get(
"clusterName")
try:
dataproc_cluster = dataproc_cluster_client.get_cluster(
project_id=project_id,
region=region,
cluster_name=dataproc_cluster_name,
)
dataproc_cluster_config = MessageToDict(dataproc_cluster._pb)
# -------------------------------------------------
# Collect metrics for cluster machine types
# -------------------------------------------------
compute_client = compute_v1.MachineTypesClient()
primary_machine_type = str(
dataproc_cluster.config.worker_config.machine_type_uri).rsplit(
"/", 1)[-1]
primary_machine_type_config = MessageToDict(
compute_client.get(
project=project_id,
zone=region + "-a",
machine_type=primary_machine_type,
)._pb)
secondary_worker_count = int(
dataproc_cluster.config.secondary_worker_config.num_instances)
secondary_machine_type_config = {}
if secondary_worker_count > 0:
secondary_machine_type = str(
dataproc_cluster.config.secondary_worker_config.
machine_type_uri).rsplit("/", 1)[-1]
secondary_machine_type_config = MessageToDict(
compute_client.get(
project=project_id,
zone=region + "-a",
machine_type=secondary_machine_type,
)._pb)
except NotFound:
logger.info("Cluster not found for job id.")
dataproc_cluster_config = None
# -------------------------------------------------
# Collect YARN metrics for Job if Cluster exists
# -------------------------------------------------
yarn_metrics = {}
if dataproc_cluster_config:
if dataproc_job.yarn_applications:
yarn_endpoint = dataproc_cluster.config.endpoint_config.http_ports.get(
"YARN ResourceManager")
application_id = dataproc_job.yarn_applications[
0].tracking_url.split("/")[-2]
base_url = f"{yarn_endpoint}ws/v1/cluster/apps/{application_id}"
try:
headers = {"Proxy-Authorization": f"Bearer {cred_token}"}
response = requests.get(url=base_url, headers=headers)
response.raise_for_status(
) # Raise an exception for HTTP errors
yarn_metrics = response.json().get("app")
except requests.exceptions.RequestException as e:
print(str(e))
continue
job_metrics = {
"dataproc_job_config": dataproc_job_config,
"dataproc_cluster_config": dataproc_cluster_config,
"primary_machine_config": primary_machine_type_config,
"secondary_machine_config": secondary_machine_type_config,
"yarn_metrics": yarn_metrics,
}
print(job_metrics)
job_metrics = clean_up_keys(job_metrics)
job_metrics = clean_up_values(job_metrics)
all_metrics.append(job_metrics)
if all_metrics:
# -------------------------------------------------
# Upload results to GCS
# -------------------------------------------------
upload_json_to_gcs(bucket_name=bucket_name,
blob_name=blob_name,
data=all_metrics)
# -------------------------------------------------
# Load results into BigQuery
# -------------------------------------------------
load_metrics_to_bigquery(
project_id=project_id,
bq_dataset=bq_dataset,
bq_table=bq_table,
bucket_name=bucket_name,
blob_name=blob_name,
)
logger.info("Metric collection complete.")
else:
logger.error("No Dataproc jobs found in the specified timeframe.")