in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]
def execute(self):
"""Verify if OOM has happened on master ."""
project = crm.get_project(op.get(flags.PROJECT_ID))
cluster_name = op.get(flags.CLUSTER_NAME)
cluster_uuid = op.get(flags.CLUSTER_UUID)
job_id = op.get(flags.JOB_ID)
log_message = 'Task Not Acquired'
log_search_filter = f"""resource.type="cloud_dataproc_cluster"
resource.labels.cluster_name="{cluster_name}"
resource.labels.cluster_uuid="{cluster_uuid}"
"{job_id}"
jsonPayload.message=~"{log_message}" """
start_time = op.get(flags.START_TIME)
end_time = op.get(flags.END_TIME)
log_entries = logs.realtime_query(
project_id=op.get(flags.PROJECT_ID),
filter_str=log_search_filter,
start_time=start_time,
end_time=end_time,
)
if log_entries:
log_message_check_sigterm = (
'Driver received SIGTERM/SIGKILL signal and exited with')
log_search_filter_check_sigterm = f"""resource.type="cloud_dataproc_cluster"
resource.labels.cluster_name="{cluster_name}"
resource.labels.cluster_uuid="{cluster_uuid}"
"{job_id}"
jsonPayload.message=~"{log_message_check_sigterm}" """
log_entries_check_sigterm = logs.realtime_query(
project_id=op.get(flags.PROJECT_ID),
filter_str=log_search_filter_check_sigterm,
start_time=start_time,
end_time=end_time,
)
if log_entries_check_sigterm:
op.add_failed(
project,
reason=op.prep_msg(
op.FAILURE_REASON,
log=log_message,
cluster_name=cluster_name,
),
remediation=op.prep_msg(op.FAILURE_REMEDIATION),
)
return
else:
log_message_check_yarn_metrics = (
'Exception calling Future.get() on YARN metrics rpc')
log_search_filter_check_yarn_metrics = f"""resource.type="cloud_dataproc_cluster"
resource.labels.cluster_name="{cluster_name}"
resource.labels.cluster_uuid="{cluster_uuid}"
jsonPayload.message=~"{log_message_check_yarn_metrics}" """
log_entries_check_yarn_metrics = logs.realtime_query(
project_id=op.get(flags.PROJECT_ID),
filter_str=log_search_filter_check_yarn_metrics,
start_time=start_time,
end_time=end_time,
)
if log_entries_check_yarn_metrics:
op.add_failed(
project,
reason=op.prep_msg(
op.FAILURE_REASON,
cluster_name=cluster_name,
),
remediation=op.prep_msg(op.FAILURE_REMEDIATION),
)
return
op.add_ok(
project,
reason=op.prep_msg(
op.SUCCESS_REASON,
cluster_name=cluster_name,
),
)