in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/HistoryHelper.java [93:150]
private void fillNext() {
boolean decisionTaskTimedOut = false;
List<HistoryEvent> decisionStartToCompletionEvents = new ArrayList<>();
List<HistoryEvent> decisionCompletionToStartEvents = new ArrayList<>();
boolean concurrentToDecision = true;
int lastDecisionIndex = -1;
long nextReplayCurrentTimeMilliseconds = -1;
while (events.hasNext()) {
HistoryEvent event = events.next();
EventType eventType = EventType.fromValue(event.eventTypeAsString());
if (eventType == EventType.DECISION_TASK_COMPLETED) {
String executionContext = event.decisionTaskCompletedEventAttributes().executionContext();
updateWorkflowContextDataAndComponentVersions(executionContext);
concurrentToDecision = false;
}
else if (eventType == EventType.DECISION_TASK_STARTED) {
nextReplayCurrentTimeMilliseconds = event.eventTimestamp().toEpochMilli();
if (decisionTaskTimedOut) {
current.getDecisionEvents().addAll(decisionStartToCompletionEvents);
decisionStartToCompletionEvents = new ArrayList<>();
decisionTaskTimedOut = false;
}
else {
break;
}
}
else if (eventType.equals(EventType.DECISION_TASK_TIMED_OUT)) {
DecisionTaskTimeoutType timeoutType = DecisionTaskTimeoutType.valueOf(event.decisionTaskTimedOutEventAttributes().timeoutTypeAsString());
// Ignore DecisionTaskTimedOut events caused by schedule-to-start timeout,
// as they don't have corresponding DecisionTaskStarted events.
if (timeoutType == DecisionTaskTimeoutType.START_TO_CLOSE) {
decisionTaskTimedOut = true;
}
}
else if (eventType == EventType.DECISION_TASK_SCHEDULED) {
// skip
}
else if (eventType == EventType.MARKER_RECORDED) {
// ignore
}
else if (eventType == EventType.RECORD_MARKER_FAILED) {
// ignore
}
else {
if (concurrentToDecision) {
decisionStartToCompletionEvents.add(event);
}
else {
if (isDecisionEvent(eventType)) {
lastDecisionIndex = decisionCompletionToStartEvents.size();
}
decisionCompletionToStartEvents.add(event);
}
}
}
List<HistoryEvent> nextEvents = reorderEvents(decisionStartToCompletionEvents, decisionCompletionToStartEvents, lastDecisionIndex);
next = new SingleDecisionData(nextEvents, nextReplayCurrentTimeMilliseconds, workflowContextData);
}