in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]
def build_tree(self):
"""Dataproc Spark Job Failures debug tree."""
# Instantiate your step classes
job_exist = JobExists()
self.add_start(job_exist)
# Check cluster exists
cluster_exists = DataProcClusterExists()
self.add_step(parent=job_exist, child=cluster_exists)
# Check stackdriver enabled
stackdriver_setting = CheckStackdriverSetting()
self.add_step(parent=cluster_exists, child=stackdriver_setting)
# Check cluster version
cluster_version = CheckClusterVersion()
self.add_step(parent=stackdriver_setting, child=cluster_version)
# Check if job failed
check_if_job_failed = CheckIfJobFailed()
self.add_step(parent=cluster_version, child=check_if_job_failed)
# Check if job failed due to task not found
check_task_not_found = CheckTaskNotFound()
self.add_step(parent=check_if_job_failed, child=check_task_not_found)
# Check if permissions meet requirements
check_permissions = CheckPermissions()
self.add_step(parent=check_task_not_found, child=check_permissions)
# Check if Master OOM happened
check_master_oom = CheckMasterOOM()
self.add_step(parent=check_task_not_found, child=check_master_oom)
# Check if worker OOM happened
check_worker_oom = CheckWorkerOOM()
self.add_step(parent=check_master_oom, child=check_worker_oom)
# Check if secodnary worker preemption happened
check_sw_preemption = CheckSWPreemption()
self.add_step(parent=check_worker_oom, child=check_sw_preemption)
# Check if worker disk usage issue happened
check_worker_disk_usage_issue = CheckWorkerDiskUsageIssue()
self.add_step(parent=check_worker_oom, child=check_worker_disk_usage_issue)
# Check cluster network connection
check_cluster_network = dp_gs.CheckClusterNetworkConnectivity()
self.add_step(parent=check_worker_oom, child=check_cluster_network)
# Check if job failed due to port exhaustion
check_port_exhaustion = CheckPortExhaustion()
self.add_step(parent=check_cluster_network, child=check_port_exhaustion)
# Check if killing orphaned application happened
check_killing_orphaned_application = CheckKillingOrphanedApplication()
self.add_step(parent=check_cluster_network,
child=check_killing_orphaned_application)
# Check if python import failure happened
check_python_import_failure = CheckPythonImportFailure()
self.add_step(
parent=check_cluster_network,
child=check_python_import_failure,
)
# Check for logs indicating shuffle failures.
check_shuffle_failures = CheckShuffleFailures()
self.add_step(parent=check_cluster_network, child=check_shuffle_failures)
# Check if fetch job failed with executor is not registered error
check_shuffle_service_kill = CheckShuffleServiceKill()
self.add_step(parent=check_shuffle_failures,
child=check_shuffle_service_kill)
# Check if GC pause happened on the cluster
check_if_gc_pause_happened = CheckGCPause()
self.add_step(
parent=check_shuffle_failures,
child=check_if_gc_pause_happened,
)
# Check YarnRuntimeException
check_yarn_runtime = CheckYarnRuntimeException()
self.add_step(parent=check_shuffle_failures, child=check_yarn_runtime)
# Check Job Throttling messages
check_job_throttling = CheckJobThrottling()
self.add_step(parent=check_shuffle_failures, child=check_job_throttling)
# Check GCS Connector
check_gcs_connector = CheckGCSConnector()
self.add_step(parent=check_shuffle_failures, child=check_gcs_connector)
#Check BQ Connector
check_bq_connector = CheckBQConnector()
self.add_step(parent=check_shuffle_failures, child=check_bq_connector)
# Ending your runbook
self.add_end(SparkJobEnd())