in gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java [195:284]
protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
GobblinTrackingEvent gobblinTrackingEvent = deserializeEvent(message);
if (gobblinTrackingEvent == null) {
return;
}
if (IssueEventBuilder.isIssueEvent(gobblinTrackingEvent)) {
try (Timer.Context context = getMetricContext().timer(PROCESS_JOB_ISSUE).time()) {
jobIssueEventHandler.processEvent(gobblinTrackingEvent);
}
}
try {
persistJobStatusRetryer.call(() -> {
// re-create `jobStatus` on each attempt, since mutated within `addJobStatusToStateStore`
org.apache.gobblin.configuration.State jobStatus = parseJobStatus(gobblinTrackingEvent);
if (jobStatus == null) {
return null;
}
try (Timer.Context context = getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
Pair<org.apache.gobblin.configuration.State, NewState> updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
jobStatus = updatedJobStatus.getLeft();
String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
long flowExecutionId = jobStatus.getPropAsLong(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup, jobName);
String status = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
// modify the status to be PENDING_RETRY only after calculating `updatedJobStatus via recalcJobStatus()`
// because ObservabilityEventProducer does not and should not understand `PENDING_RETRY` status in convertExecutionStatusTojobState()
// which is called inside emitObservabilityEvent()
// this can also be addressed by some other new job status like FAILED_PENDING_RETRY which does not alert the user
// as much as FAILED does if we chose to emit ObservabilityEvent for FAILED_PENDING_RETRY
boolean retryRequired = modifyStateIfRetryRequired(jobStatus);
if (updatedJobStatus.getRight() == NewState.FINISHED && !retryRequired) {
// do not send event if retry is required, because it can alert users to re-submit a job that is already set to be retried by GaaS
this.eventProducer.emitObservabilityEvent(jobStatus);
}
if (DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
try {
this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} catch (IOException e) {
if (ExceptionUtils.isExceptionInstanceOf(e, nonRetryableExceptions)) {
this.dagManagementStateStore.getDagManagerMetrics().dagActionCreationExceptionsInJobStatusMonitor.mark();
log.error("Could not add REEVALUATE dag action for flow group - {}, flow name - {}, flowExecutionId - {}, "
+ "jobName = {} due to {}. Ignoring...", flowGroup, flowName, flowExecutionId, jobName, e.getMessage());
} else {
throw e;
}
}
} else if (updatedJobStatus.getRight() == NewState.RUNNING) {
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName);
}
// in case, the job is cancelled before it started, we need to clean it's enforceJobStartDeadlineDagAction
if (status != null && ExecutionStatus.valueOf(status).equals(ExecutionStatus.CANCELLED)) {
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName);
}
}
// update the state store after adding a dag action to guaranty at-least-once adding of dag action
stateStore.put(storeName, tableName, jobStatus);
}
return null;
});
} catch (ExecutionException ee) {
String msg = String.format("Failed to add job status to state store for kafka offset %d", message.getOffset());
log.warn(msg, ee);
// Throw RuntimeException to avoid advancing kafka offsets without updating state store
throw new RuntimeException(msg, ee.getCause());
} catch (RetryException re) {
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : "";
String msg = String.format("Failed to add job status to state store for kafka offset %d (retried %d times%s)",
message.getOffset(), re.getNumberOfFailedAttempts(), interruptedNote);
Throwable informativeException = re.getLastFailedAttempt().hasException()
? re.getLastFailedAttempt().getExceptionCause()
: re;
log.warn(msg, informativeException);
// Throw RuntimeException to avoid advancing kafka offsets without updating state store
throw new RuntimeException(msg, informativeException);
}
}