protected void processMessage()

in gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java [195:284]


  protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
    GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);

    if (gobblinTrackingEvent == null) {
      return;
    }

    if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
      try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
        jobIssueEventHandler.processEvent(gobblinTrackingEvent);
      }
    }

    try {
      persistJobStatusRetryer.call(() -> {
        // re-create `jobStatus` on each attempt, since mutated within `addJobStatusToStateStore`
        org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
        if (jobStatus == null) {
          return null;
        }

        try (Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
          Pair<org.apache.gobblin.configuration.State, NewState> updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
          jobStatus = updatedJobStatus.getLeft();

          String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
          String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
          long flowExecutionId = jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
          String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
          String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
          String storeName = jobStatusStoreName(flowGroup, flowName);
          String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
          String status = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);

          // modify the status to be PENDING_RETRY only after calculating `updatedJobStatus via recalcJobStatus()`
          // because ObservabilityEventProducer does not and should not understand `PENDING_RETRY` status in convertExecutionStatusTojobState()
          // which is called inside emitObservabilityEvent()
          // this can also be addressed by some other new job status like FAILED_PENDING_RETRY which does not alert the user
          // as much as FAILED does if we chose to emit ObservabilityEvent for FAILED_PENDING_RETRY
          boolean retryRequired = modifyStateIfRetryRequired(jobStatus);

          if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) {
            // do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS
            this.eventProducer.emitObservabilityEvent(jobStatus);
          }

          if (DagProcUtils.isJobLevelStatus(jobName)) {
            if (updatedJobStatus.getRight() == NewState.FINISHED) {
              try {
                this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
              } catch (IOException e) {
                if (ExceptionUtils.isExceptionInstanceOf(e, nonRetryableExceptions)) {
                  this.dagManagementStateStore.getDagManagerMetrics().dagActionCreationExceptionsInJobStatusMonitor.mark();
                  log.error("Could not add REEVALUATE dag action for flow group - {}, flow name - {}, flowExecutionId - {}, "
                      + "jobName = {} due to {}. Ignoring...", flowGroup, flowName, flowExecutionId, jobName, e.getMessage());
                } else {
                  throw e;
                }
              }
            } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
              DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName);
            }
            // in case, the job is cancelled before it started, we need to clean it's enforceJobStartDeadlineDagAction
            if (status != null && ExecutionStatus.valueOf(status).equals(ExecutionStatus.CANCELLED)) {
              DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName);
            }
          }

          // update the state store after adding a dag action to guaranty at-least-once adding of dag action
          stateStore.put(storeName, tableName, jobStatus);
        }
        return null;
      });
    } catch (ExecutionException ee) {
      String msg = String.format("Failed to add job status to state store for kafka offset %d", message.getOffset());
      log.warn(msg, ee);
      // Throw RuntimeException to avoid advancing kafka offsets without updating state store
      throw new RuntimeException(msg, ee.getCause());
    } catch (RetryException re) {
      String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : "";
      String msg = String.format("Failed to add job status to state store for kafka offset %d (retried %d times%s)",
          message.getOffset(), re.getNumberOfFailedAttempts(), interruptedNote);
      Throwable informativeException = re.getLastFailedAttempt().hasException()
          ? re.getLastFailedAttempt().getExceptionCause()
          : re;
      log.warn(msg, informativeException);
      // Throw RuntimeException to avoid advancing kafka offsets without updating state store
      throw new RuntimeException(msg, informativeException);
    }
  }