in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]
def execute(self):
"""Check for logs indicating shuffle failures."""
project = crm.get_project(op.get(flags.PROJECT_ID))
cluster_name = op.get(flags.CLUSTER_NAME)
cluster_uuid = op.get(flags.CLUSTER_UUID)
log_search_filter = f"""resource.type="cloud_dataproc_cluster"
resource.labels.cluster_name="{cluster_name}"
resource.labels.cluster_uuid="{cluster_uuid}"
"{op.get(flags.JOB_ID)}"
(
("ExecutorLostFailure" AND "Unable to create executor" AND "Unable to register with external shuffle server") OR
("java.io.IOException" AND "Exception while uploading shuffle data") OR
("Requesting driver to remove executor" AND "Container from a bad node")
) """
if op.get(flags.JOB_OLDER_THAN_30_DAYS):
op.add_skipped(
project,
reason=('Job is older than 30 days, skipping this step. '
'Please create a new job and run the runbook again.'),
)
return
start_time = op.get(flags.START_TIME)
end_time = op.get(flags.END_TIME)
log_entries = logs.realtime_query(
project_id=project.id,
filter_str=log_search_filter,
start_time=start_time,
end_time=end_time,
)
if log_entries:
cluster = dataproc.get_cluster(cluster_name=op.get(flags.CLUSTER_NAME),
region=op.get(flags.REGION),
project=op.get(flags.PROJECT_ID))
root_causes = []
remediation = []
# Check for insufficient primary workers in EFM
if (cluster.config.software_config.properties.get(
'dataproc:dataproc.enable.enhanced.flexibility.mode',
'false') == 'true'):
if (cluster.number_of_primary_workers /
cluster.number_of_secondary_workers < 1):
root_causes.append('Insufficient primary workers in EFM.')
remediation.append(
'Consider increasing the primary to secondary worker ratio.')
# Check for older image and suggest EFM HCFS mode
if (cluster.config.software_config.image_version.startswith('1.5') and
cluster.config.software_config.properties.get(
'dataproc:efm.spark.shuffle') != 'hcfs'):
remediation.append(
'Consider using EFM HCFS mode with GCS for older images.')
# Check for small disk size
disk_size_gb = cluster.config.worker_config.disk_config.boot_disk_size_gb
if disk_size_gb < 500:
root_causes.append(
f'Small disk size ({disk_size_gb} GB) on cluster nodes.')
remediation.append(
'Consider increasing disk size for better I/O performance.')
# Check for low IO connection timeout
spark_shuffle_io_timeout = cluster.config.software_config.properties.get(
'spark:spark.shuffle.io.connectionTimeout', 120)
if spark_shuffle_io_timeout < 600:
root_causes.append('Low IO connection timeout in Spark shuffle.')
remediation.append(
"Consider increasing 'spark:spark.shuffle.io.connectionTimeout' to"
' 600.')
# Check for data skew and large partitions with PVM secondary workers
if cluster.is_preemptible_secondary_workers:
root_causes.append(
'Data skew and large partitions might be an issue with PVM'
' secondary workers.')
remediation.append(
'Consider using smaller batches, increasing partition count, or'
' using a better partitioning key.')
op.add_failed(
crm.get_project(project.id),
reason=op.prep_msg(
op.FAILURE_REASON,
cluster_name=cluster_name,
root_causes=', '.join(root_causes),
),
remediation=op.prep_msg(op.FAILURE_REMEDIATION,
remediation=', '.join(remediation)),
)
else:
op.add_ok(
crm.get_project(project.id),
reason=op.prep_msg(
op.SUCCESS_REASON,
cluster_name=cluster_name,
),
)