def execute()

in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]


  def execute(self):
    """Check for logs indicating shuffle failures."""

    project = crm.get_project(op.get(flags.PROJECT_ID))

    cluster_name = op.get(flags.CLUSTER_NAME)
    cluster_uuid = op.get(flags.CLUSTER_UUID)

    log_search_filter = f"""resource.type="cloud_dataproc_cluster"
    resource.labels.cluster_name="{cluster_name}"
    resource.labels.cluster_uuid="{cluster_uuid}"
    "{op.get(flags.JOB_ID)}"
    (
      ("ExecutorLostFailure" AND "Unable to create executor" AND "Unable to register with external shuffle server") OR
      ("java.io.IOException" AND "Exception while uploading shuffle data") OR
      ("Requesting driver to remove executor" AND "Container from a bad node")
    ) """

    if op.get(flags.JOB_OLDER_THAN_30_DAYS):
      op.add_skipped(
          project,
          reason=('Job is older than 30 days, skipping this step. '
                  'Please create a new job and run the runbook again.'),
      )
      return

    start_time = op.get(flags.START_TIME)
    end_time = op.get(flags.END_TIME)

    log_entries = logs.realtime_query(
        project_id=project.id,
        filter_str=log_search_filter,
        start_time=start_time,
        end_time=end_time,
    )

    if log_entries:
      cluster = dataproc.get_cluster(cluster_name=op.get(flags.CLUSTER_NAME),
                                     region=op.get(flags.REGION),
                                     project=op.get(flags.PROJECT_ID))

      root_causes = []
      remediation = []

      # Check for insufficient primary workers in EFM
      if (cluster.config.software_config.properties.get(
          'dataproc:dataproc.enable.enhanced.flexibility.mode',
          'false') == 'true'):
        if (cluster.number_of_primary_workers /
            cluster.number_of_secondary_workers < 1):
          root_causes.append('Insufficient primary workers in EFM.')
          remediation.append(
              'Consider increasing the primary to secondary worker ratio.')

      # Check for older image and suggest EFM HCFS mode
      if (cluster.config.software_config.image_version.startswith('1.5') and
          cluster.config.software_config.properties.get(
              'dataproc:efm.spark.shuffle') != 'hcfs'):
        remediation.append(
            'Consider using EFM HCFS mode with GCS for older images.')

      # Check for small disk size
      disk_size_gb = cluster.config.worker_config.disk_config.boot_disk_size_gb
      if disk_size_gb < 500:
        root_causes.append(
            f'Small disk size ({disk_size_gb} GB) on cluster nodes.')
        remediation.append(
            'Consider increasing disk size for better I/O performance.')

      # Check for low IO connection timeout
      spark_shuffle_io_timeout = cluster.config.software_config.properties.get(
          'spark:spark.shuffle.io.connectionTimeout', 120)
      if spark_shuffle_io_timeout < 600:
        root_causes.append('Low IO connection timeout in Spark shuffle.')
        remediation.append(
            "Consider increasing 'spark:spark.shuffle.io.connectionTimeout' to"
            ' 600.')

      # Check for data skew and large partitions with PVM secondary workers
      if cluster.is_preemptible_secondary_workers:
        root_causes.append(
            'Data skew and large partitions might be an issue with PVM'
            ' secondary workers.')
        remediation.append(
            'Consider using smaller batches, increasing partition count, or'
            ' using a better partitioning key.')

      op.add_failed(
          crm.get_project(project.id),
          reason=op.prep_msg(
              op.FAILURE_REASON,
              cluster_name=cluster_name,
              root_causes=', '.join(root_causes),
          ),
          remediation=op.prep_msg(op.FAILURE_REMEDIATION,
                                  remediation=', '.join(remediation)),
      )
    else:
      op.add_ok(
          crm.get_project(project.id),
          reason=op.prep_msg(
              op.SUCCESS_REASON,
              cluster_name=cluster_name,
          ),
      )