in src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java [187:242]
public DecisionEvents next() {
List<HistoryEvent> decisionEvents = new ArrayList<>();
List<HistoryEvent> newEvents = new ArrayList<>();
boolean replay = true;
long nextDecisionEventId = -1;
while (events.hasNext()) {
HistoryEvent event = events.next();
EventType eventType = event.getEventType();
// Sticky workers receive an event history that starts with DecisionTaskCompleted
if (eventType == EventType.DecisionTaskCompleted && nextDecisionEventId == -1) {
nextDecisionEventId = event.getEventId() + 1;
break;
}
if (eventType == EventType.DecisionTaskStarted || !events.hasNext()) {
replayCurrentTimeMilliseconds = TimeUnit.NANOSECONDS.toMillis(event.getTimestamp());
if (!events.hasNext()) {
replay = false;
nextDecisionEventId = event.getEventId() + 2; // +1 for next, +1 for DecisionCompleted
break;
}
HistoryEvent peeked = events.peek();
EventType peekedType = peeked.getEventType();
if (peekedType == EventType.DecisionTaskTimedOut
|| peekedType == EventType.DecisionTaskFailed) {
continue;
} else if (peekedType == EventType.DecisionTaskCompleted) {
events.next(); // consume DecisionTaskCompleted
nextDecisionEventId = peeked.getEventId() + 1; // +1 for next and skip over completed
break;
} else {
throw new Error(
"Unexpected event after DecisionTaskStarted: "
+ peeked
+ " DecisionTaskStarted Event: "
+ event);
}
}
newEvents.add(event);
}
while (events.hasNext()) {
if (!WorkflowExecutionUtils.isDecisionEvent(events.peek())) {
break;
}
decisionEvents.add(events.next());
}
DecisionEvents result =
new DecisionEvents(
newEvents,
decisionEvents,
replay,
replayCurrentTimeMilliseconds,
nextDecisionEventId);
return result;
}