in src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java [430:518]
private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)
throws Throwable {
boolean forceCreateNewDecisionTask = false;
try {
long startTime = System.currentTimeMillis();
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator =
new DecisionTaskWithHistoryIteratorImpl(
decisionTask, Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds()));
HistoryHelper historyHelper =
new HistoryHelper(
decisionTaskWithHistoryIterator, context.getReplayCurrentTimeMilliseconds());
DecisionEventsIterator iterator = historyHelper.getIterator();
if ((decisionsHelper.getNextDecisionEventId()
!= historyHelper.getPreviousStartedEventId()
+ 2) // getNextDecisionEventId() skips over completed.
&& (decisionsHelper.getNextDecisionEventId() != 0
&& historyHelper.getPreviousStartedEventId() != 0)
&& (decisionTask.getHistory().getEventsSize() > 0)) {
throw new IllegalStateException(
String.format(
"ReplayDecider expects next event id at %d. History's previous started event id is %d",
decisionsHelper.getNextDecisionEventId(),
historyHelper.getPreviousStartedEventId()));
}
while (iterator.hasNext()) {
DecisionEvents decision = iterator.next();
context.setReplaying(decision.isReplay());
context.setReplayCurrentTimeMilliseconds(decision.getReplayCurrentTimeMilliseconds());
decisionsHelper.handleDecisionTaskStartedEvent(decision);
// Markers must be cached first as their data is needed when processing events.
for (HistoryEvent event : decision.getMarkers()) {
if (!event
.getMarkerRecordedEventAttributes()
.getMarkerName()
.equals(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)) {
processEvent(event);
}
}
for (HistoryEvent event : decision.getEvents()) {
processEvent(event);
}
forceCreateNewDecisionTask =
processEventLoop(
startTime,
startedEvent.getTaskStartToCloseTimeoutSeconds(),
decision,
decisionTask.getQuery() != null);
mayBeCompleteWorkflow();
if (decision.isReplay()) {
decisionsHelper.notifyDecisionSent();
}
// Updates state machines with results of the previous decisions
for (HistoryEvent event : decision.getDecisionEvents()) {
processEvent(event);
}
// Reset state to before running the event loop
decisionsHelper.handleDecisionTaskStartedEvent(decision);
}
if (forceCreateNewDecisionTask) {
metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1);
}
return forceCreateNewDecisionTask;
} catch (Error e) {
if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
== FailWorkflow) {
// fail workflow
failure = workflow.mapError(e);
completed = true;
completeWorkflow();
return false;
} else {
metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);
// fail decision, not a workflow
throw e;
}
} finally {
if (query != null) {
query.apply();
}
if (completed) {
close();
}
}
}