in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/WorkflowState.java [227:482]
public static WorkflowState build(PollForDecisionTaskResponse task) {
HistoryEvent mostRecentClosedEvent = null;
HistoryEvent mostRecentStartedEvent = null;
WorkflowState ws = new WorkflowState();
ws.rawPartitionMetadata = new HashMap<>();
ws.openTimers = new HashMap<>();
ws.closedTimers = new HashMap<>();
ws.signalsByActivityId = new HashMap<>();
ws.stepInitialAttemptTimes = new HashMap<>();
ws.latestPartitionStates = new HashMap<>();
ws.workflowCancelRequestDate = null;
ws.workflowExecutionClosed = false;
ws.workflowId = task.workflowExecution().workflowId();
ws.workflowRunId = task.workflowExecution().runId();
Map<Long, String> closedTimersByStartedEventId = new HashMap<>();
Map<Long, HistoryEvent> closedEventsByScheduledEventId = new HashMap<>();
Map<String, Set<Long>> partitionMarkerSubsetsFound = new HashMap<>();
Map<String, List<String>> partitionMarkerSubsets = new HashMap<>();
// the events should be in reverse-chronological order
for (HistoryEvent event : task.events()) {
if (ACTIVITY_SCHEDULING_FAILURE_EVENTS.contains(event.eventType())) {
// This case is a bit weird. Basically, we submitted a ScheduleActivityTask decision,
// and SWF failed to schedule it as requested (e.g. we may have exceeded their rate limit
// for scheduling activities).
// The decider will deal with this in a couple of ways depending on the exact details,
// but for our purposes here we just need to do the following:
// 1. If the retry attempt for this partition is larger than 0, ignore the event entirely.
// 2. If the retry attempt for this partition is zero, and this is the first time we've seen this partition id
// (which will happen if only one attempt to schedule this partition was made, and it failed),
// then we need to track that we've seen this partition id but have no metadata for it.
// See https://github.com/awslabs/flux-swf-client/issues/33 for details on why this is important.
// We'll deal with this by treating it as if we know the partition id but have not yet scheduled it.
// We also have to extract the partition id from the activity id since it's not stored in the event data.
String activityName = event.scheduleActivityTaskFailedEventAttributes().activityType().name();
String stepName = TaskNaming.stepNameFromActivityName(activityName);
String activityId = event.scheduleActivityTaskFailedEventAttributes().activityId();
String retryAttemptAndPartitionId = activityId.substring(stepName.length() + 1); // skip the step name and first _
String partitionId;
// If the step is partitioned, the remainder of the activity id is of the form "{retryAttempt}_{partitionId}".
// If the step isn't partitioned, we can just ignore this event. It'll get rescheduled properly.
if (retryAttemptAndPartitionId.contains("_")) {
int underscorePos = retryAttemptAndPartitionId.indexOf("_");
int retryAttempt = Integer.parseInt(retryAttemptAndPartitionId.substring(0, underscorePos));
if (retryAttempt > 0) {
// Since the retry attempt is nonzero, we can ignore this event entirely.
continue;
}
partitionId = retryAttemptAndPartitionId.substring(underscorePos + 1);
if (!ws.latestPartitionStates.containsKey(activityName)) {
ws.latestPartitionStates.put(activityName, new HashMap<>());
}
// the retry attempt is 0, so we need to check if we previously saw state for this workflow step.
// if we see an entry for the partition id already, then either:
// a) we've seen a successful execution of the partition
// b) we've seen a failed attempt to execute attempt 0 of the partition
// In both cases we can skip this event.
if (ws.latestPartitionStates.get(activityName).containsKey(partitionId)) {
continue;
}
// If we get here, we need to store the partition id with no state so that we know we've seen this case.
ws.latestPartitionStates.get(activityName).put(partitionId, null);
}
} else if (MARKER_EVENTS.contains(event.eventType())) {
// There may be markers we don't care about in the history, such as the "unknown result code" marker that Flux adds,
// or markers added by other tools.
String markerName = event.markerRecordedEventAttributes().markerName();
if (TaskNaming.isPartitionMetadataMarker(markerName)) {
String stepName = TaskNaming.extractPartitionMetadataMarkerStepName(markerName);
Long subsetId = TaskNaming.extractPartitionMetadataMarkerSubsetId(markerName);
partitionMarkerSubsetsFound.putIfAbsent(stepName, new HashSet<>());
partitionMarkerSubsetsFound.get(stepName).add(subsetId);
partitionMarkerSubsets.putIfAbsent(stepName, new LinkedList<>());
partitionMarkerSubsets.get(stepName).add(getMarkerData(event));
// If we've found all of the subsets, we can record the aggregate marker data.
// We can then clear out the temporary places we were storing it in since we don't need it anymore.
Long markerCount = TaskNaming.extractPartitionMetadataMarkerCount(markerName);
if (partitionMarkerSubsetsFound.get(stepName).size() == markerCount) {
// It's possible a marker was added more than once for some reason,
// so only insert if we don't already have marker data for this step.
ws.rawPartitionMetadata.putIfAbsent(stepName, partitionMarkerSubsets.get(stepName));
partitionMarkerSubsetsFound.remove(stepName);
partitionMarkerSubsets.remove(stepName);
}
}
} else if (ACTIVITY_CLOSED_EVENTS.contains(event.eventType())) {
if (mostRecentClosedEvent == null) {
mostRecentClosedEvent = event;
}
closedEventsByScheduledEventId.put(getScheduledEventId(event), event);
} else if (ACTIVITY_START_EVENTS.contains(event.eventType())) {
if (mostRecentStartedEvent == null) {
mostRecentStartedEvent = event;
}
if (EventType.WORKFLOW_EXECUTION_STARTED.equals(event.eventType())) {
ws.workflowStartDate = event.eventTimestamp();
ws.workflowInput = getStepData(event);
} else if (EventType.ACTIVITY_TASK_SCHEDULED.equals(event.eventType())) {
String activityName = getActivityName(event);
String partitionId = event.activityTaskScheduledEventAttributes().control();
// Since we're iterating over these in reverse order, we can just always overwrite whatever time is here.
ws.stepInitialAttemptTimes.put(activityName, event.eventTimestamp());
// if we already have newer state for this partition, don't bother creating an older PartitionState.
if (!ws.latestPartitionStates.containsKey(activityName)) {
ws.latestPartitionStates.put(activityName, new HashMap<>());
}
if (!ws.latestPartitionStates.get(activityName).containsKey(partitionId)) {
PartitionState partition = PartitionState.build(event, closedEventsByScheduledEventId.get(event.eventId()));
ws.latestPartitionStates.get(activityName).put(partitionId, partition);
}
}
} else if (TIMER_START_EVENTS.contains(event.eventType())) {
// The timer will be in the closedTimersByStartedEventId map if it is already closed.
if (!closedTimersByStartedEventId.containsKey(event.eventId())) {
TimerData timerData = new TimerData(event);
ws.openTimers.put(timerData.getTimerId(), timerData);
}
} else if (TIMER_CLOSED_EVENTS.contains(event.eventType())) {
String timerId = getClosedTimerId(event);
long startEventId = getTimerStartedEventId(event);
closedTimersByStartedEventId.put(startEventId, timerId);
// If the timer is in the open timers list, it's because it was reopened.
// In that case, don't add it to the closed list.
// If it's in the closed timers list already, then a later version of the timer was already fired;
// in that case, we don't want to overwrite what's already in the closed list.
if (!ws.openTimers.containsKey(timerId) && !ws.closedTimers.containsKey(timerId)) {
ws.closedTimers.put(timerId, event.eventId());
}
} else if (SIGNAL_EVENTS.contains(event.eventType())) {
SignalType type = SignalType.fromFriendlyName(event.workflowExecutionSignaledEventAttributes().signalName());
if (type != null) {
BaseSignalData signalData = SignalUtils.decodeSignal(event.workflowExecutionSignaledEventAttributes());
if (signalData != null) {
// since these events are in reverse-chronological order, we can guarantee we only keep the most recent
// event of each type by only saving this signal if we don't already have one.
if (!ws.signalsByActivityId.containsKey(signalData.getActivityId())) {
signalData.setSignalEventId(event.eventId());
signalData.setSignalEventTime(event.eventTimestamp());
ws.signalsByActivityId.put(signalData.getActivityId(), signalData);
}
}
}
} else if (WORKFLOW_CANCEL_REQUESTED_EVENTS.contains(event.eventType())) {
// if more than one cancellation request was sent, we'll just use the most recent one
if (ws.workflowCancelRequestDate == null) {
ws.workflowCancelRequestDate = event.eventTimestamp();
}
} else if (WORKFLOW_END_EVENTS.contains(event.eventType())) {
ws.workflowExecutionClosed = true;
}
}
if (mostRecentStartedEvent == null) {
throw new BadWorkflowStateException("Unable to handle a workflow with no start event");
} else if (ws.workflowStartDate == null) {
throw new BadWorkflowStateException("Unable to handle a workflow with no WorkflowExecutionStarted event");
}
ws.currentActivityName = getActivityName(mostRecentStartedEvent);
if (ws.currentActivityName != null) {
Map<String, PartitionState> currentStepPartitions = ws.latestPartitionStates.get(ws.currentActivityName);
if (currentStepPartitions.isEmpty()) {
throw new BadWorkflowStateException("Found a workflow step with no history");
}
ws.currentStepFirstScheduledTime = null;
ws.currentStepResultCode = StepResult.SUCCEED_RESULT_CODE;
ws.currentStepCompletionTime = null;
ws.currentStepLastActivityCompletionMessage = null;
ws.currentStepMaxRetryCount = 0L;
boolean hasPartitionNeedingRetry = false;
for (Map.Entry<String, PartitionState> e : currentStepPartitions.entrySet()) {
PartitionState lastState = e.getValue();
if (lastState == null) {
// If we get here, then we tried to schedule this partition but we got a ScheduleActivityFailedEvent.
// In this case, we'll need to treat the partition as needing to be retried/rescheduled.
hasPartitionNeedingRetry = true;
continue;
}
if (lastState.getResultCode() != null) {
ws.currentStepLastActivityCompletionMessage
= lastState.getAttemptOutput().get(StepAttributes.ACTIVITY_COMPLETION_MESSAGE);
ws.currentStepCompletionTime = lastState.getAttemptCompletedTime();
}
ws.currentStepMaxRetryCount = Math.max(ws.currentStepMaxRetryCount, lastState.getRetryAttempt());
ws.currentStepFirstScheduledTime = ws.stepInitialAttemptTimes.get(ws.currentActivityName);
String stepName = TaskNaming.stepNameFromActivityName(ws.currentActivityName);
String partitionId = e.getKey();
String signalActivityId = TaskNaming.createActivityId(stepName, lastState.getRetryAttempt() + 1, partitionId);
String effectiveResultCode = lastState.getResultCode();
BaseSignalData signal = ws.signalsByActivityId.get(signalActivityId);
if (signal != null && signal.getSignalType() == SignalType.FORCE_RESULT) {
effectiveResultCode = ((ForceResultSignalData)signal).getResultCode();
if (ws.currentStepCompletionTime == null
|| ws.currentStepCompletionTime.isBefore(signal.getSignalEventTime())) {
ws.currentStepCompletionTime = signal.getSignalEventTime();
}
if (ws.currentStepLastActivityCompletionMessage == null) {
ws.currentStepLastActivityCompletionMessage = FORCED_RESULT_MESSAGE;
}
}
if (lastState.getPartitionCount() == 0) {
ws.currentStepResultCode = effectiveResultCode;
} else {
// For partitioned steps, we need to be more selective about how we generate the current result code.
// If any partitions need to retry, we retry those partitions.
// If no partitions need to retry, and at least one partition failed, we use that result.
// Otherwise, we succeed.
if (effectiveResultCode != null
&& !StepResult.FAIL_RESULT_CODE.equals(ws.currentStepResultCode)) {
ws.currentStepResultCode = effectiveResultCode;
} else if (effectiveResultCode == null) {
// If we don't have a resultCode for the partition, it means we need to retry it.
hasPartitionNeedingRetry = true;
break;
}
}
}
if (hasPartitionNeedingRetry) {
ws.currentStepResultCode = null;
ws.currentStepCompletionTime = null;
}
}
return ws;
}