def execute()

in gcpdiag/runbook/dataproc/spark_job_failures.py [0:0]


  def execute(self):
    """Verify if OOM has happened on master ."""

    project = crm.get_project(op.get(flags.PROJECT_ID))

    cluster_name = op.get(flags.CLUSTER_NAME)
    cluster_uuid = op.get(flags.CLUSTER_UUID)
    job_id = op.get(flags.JOB_ID)
    log_message = 'Task Not Acquired'

    log_search_filter = f"""resource.type="cloud_dataproc_cluster"
    resource.labels.cluster_name="{cluster_name}"
    resource.labels.cluster_uuid="{cluster_uuid}"
    "{job_id}"
    jsonPayload.message=~"{log_message}" """

    start_time = op.get(flags.START_TIME)
    end_time = op.get(flags.END_TIME)

    log_entries = logs.realtime_query(
        project_id=op.get(flags.PROJECT_ID),
        filter_str=log_search_filter,
        start_time=start_time,
        end_time=end_time,
    )

    if log_entries:
      log_message_check_sigterm = (
          'Driver received SIGTERM/SIGKILL signal and exited with')

      log_search_filter_check_sigterm = f"""resource.type="cloud_dataproc_cluster"
      resource.labels.cluster_name="{cluster_name}"
      resource.labels.cluster_uuid="{cluster_uuid}"
      "{job_id}"
      jsonPayload.message=~"{log_message_check_sigterm}" """

      log_entries_check_sigterm = logs.realtime_query(
          project_id=op.get(flags.PROJECT_ID),
          filter_str=log_search_filter_check_sigterm,
          start_time=start_time,
          end_time=end_time,
      )

      if log_entries_check_sigterm:
        op.add_failed(
            project,
            reason=op.prep_msg(
                op.FAILURE_REASON,
                log=log_message,
                cluster_name=cluster_name,
            ),
            remediation=op.prep_msg(op.FAILURE_REMEDIATION),
        )
        return
      else:
        log_message_check_yarn_metrics = (
            'Exception calling Future.get() on YARN metrics rpc')

        log_search_filter_check_yarn_metrics = f"""resource.type="cloud_dataproc_cluster"
        resource.labels.cluster_name="{cluster_name}"
        resource.labels.cluster_uuid="{cluster_uuid}"
        jsonPayload.message=~"{log_message_check_yarn_metrics}" """

        log_entries_check_yarn_metrics = logs.realtime_query(
            project_id=op.get(flags.PROJECT_ID),
            filter_str=log_search_filter_check_yarn_metrics,
            start_time=start_time,
            end_time=end_time,
        )
        if log_entries_check_yarn_metrics:
          op.add_failed(
              project,
              reason=op.prep_msg(
                  op.FAILURE_REASON,
                  cluster_name=cluster_name,
              ),
              remediation=op.prep_msg(op.FAILURE_REMEDIATION),
          )
          return

    op.add_ok(
        project,
        reason=op.prep_msg(
            op.SUCCESS_REASON,
            cluster_name=cluster_name,
        ),
    )