in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]
def execute(self):
"""Verify job exists in Dataproc UI."""
project = crm.get_project(op.get(flags.PROJECT_ID))
if not op.get(flags.JOB_ID) or not op.get(flags.REGION):
op.add_failed(
project,
reason=op.prep_msg(
op.FAILURE_REASON,
project_id=project,
job_id=op.get(flags.JOB_ID),
cluster_name=op.get(flags.CLUSTER_NAME),
),
remediation=op.prep_msg(op.FAILURE_REMEDIATION),
)
return
# uses the API to get the cluster information from the job id
try:
job = dataproc.get_job_by_jobid(project_id=op.get(flags.PROJECT_ID),
region=op.get(flags.REGION),
job_id=op.get(flags.JOB_ID))
except (AttributeError, GcpApiError, IndexError, TypeError,
ValueError) as e:
op.put(flags.JOB_EXIST, 'false')
op.add_failed(
project,
reason=op.prep_msg(
op.FAILURE_REASON,
project_id=project,
job_id=op.get(flags.JOB_ID),
cluster_name=op.get(flags.CLUSTER_NAME),
error=e,
),
remediation=op.prep_msg(op.FAILURE_REMEDIATION),
)
return
# Start date is the date for when the job was running
start_time = datetime.strptime(
job.status_history['RUNNING'],
'%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
# End date is the start date + 7 days
end_time = start_time + timedelta(days=7)
# Saving cluster parameters
op.put(flags.START_TIME, start_time)
op.info(f'Start time utc:{start_time}')
op.put(flags.END_TIME, end_time)
op.info(f'End time utc:{end_time}')
op.put(flags.CLUSTER_UUID, job.cluster_uuid)
op.put(flags.CLUSTER_NAME, job.cluster_name)
if check_datetime_gap(op.get(flags.START_TIME), op.get(flags.END_TIME), 30):
op.put(flags.JOB_OLDER_THAN_30_DAYS, True)
else:
op.put(flags.JOB_OLDER_THAN_30_DAYS, False)
op.add_ok(
project,
reason=op.prep_msg(
op.SUCCESS_REASON,
project_id=project,
job_id=op.get(flags.JOB_ID),
cluster_name=op.get(flags.CLUSTER_NAME),
),
)
return