public static WorkflowState build()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/WorkflowState.java [227:482]


    public static WorkflowState build(PollForDecisionTaskResponse task) {
        HistoryEvent mostRecentClosedEvent = null;
        HistoryEvent mostRecentStartedEvent = null;

        WorkflowState ws = new WorkflowState();
        ws.rawPartitionMetadata = new HashMap<>();
        ws.openTimers = new HashMap<>();
        ws.closedTimers = new HashMap<>();
        ws.signalsByActivityId = new HashMap<>();

        ws.stepInitialAttemptTimes = new HashMap<>();
        ws.latestPartitionStates = new HashMap<>();

        ws.workflowCancelRequestDate = null;
        ws.workflowExecutionClosed = false;

        ws.workflowId = task.workflowExecution().workflowId();
        ws.workflowRunId = task.workflowExecution().runId();

        Map<Long, String> closedTimersByStartedEventId = new HashMap<>();

        Map<Long, HistoryEvent> closedEventsByScheduledEventId = new HashMap<>();

        Map<String, Set<Long>> partitionMarkerSubsetsFound = new HashMap<>();
        Map<String, List<String>> partitionMarkerSubsets = new HashMap<>();

        // the events should be in reverse-chronological order
        for (HistoryEvent event : task.events()) {
            if (ACTIVITY_SCHEDULING_FAILURE_EVENTS.contains(event.eventType())) {
                // This case is a bit weird. Basically, we submitted a ScheduleActivityTask decision,
                // and SWF failed to schedule it as requested (e.g. we may have exceeded their rate limit
                // for scheduling activities).

                // The decider will deal with this in a couple of ways depending on the exact details,
                // but for our purposes here we just need to do the following:
                // 1. If the retry attempt for this partition is larger than 0, ignore the event entirely.
                // 2. If the retry attempt for this partition is zero, and this is the first time we've seen this partition id
                //    (which will happen if only one attempt to schedule this partition was made, and it failed),
                //    then we need to track that we've seen this partition id but have no metadata for it.
                // See https://github.com/awslabs/flux-swf-client/issues/33 for details on why this is important.

                // We'll deal with this by treating it as if we know the partition id but have not yet scheduled it.
                // We also have to extract the partition id from the activity id since it's not stored in the event data.
                String activityName = event.scheduleActivityTaskFailedEventAttributes().activityType().name();
                String stepName = TaskNaming.stepNameFromActivityName(activityName);
                String activityId = event.scheduleActivityTaskFailedEventAttributes().activityId();
                String retryAttemptAndPartitionId = activityId.substring(stepName.length() + 1); // skip the step name and first _
                String partitionId;

                // If the step is partitioned, the remainder of the activity id is of the form "{retryAttempt}_{partitionId}".
                // If the step isn't partitioned, we can just ignore this event. It'll get rescheduled properly.
                if (retryAttemptAndPartitionId.contains("_")) {
                    int underscorePos = retryAttemptAndPartitionId.indexOf("_");
                    int retryAttempt = Integer.parseInt(retryAttemptAndPartitionId.substring(0, underscorePos));
                    if (retryAttempt > 0) {
                        // Since the retry attempt is nonzero, we can ignore this event entirely.
                        continue;
                    }

                    partitionId = retryAttemptAndPartitionId.substring(underscorePos + 1);

                    if (!ws.latestPartitionStates.containsKey(activityName)) {
                        ws.latestPartitionStates.put(activityName, new HashMap<>());
                    }

                    // the retry attempt is 0, so we need to check if we previously saw state for this workflow step.
                    // if we see an entry for the partition id already, then either:
                    // a) we've seen a successful execution of the partition
                    // b) we've seen a failed attempt to execute attempt 0 of the partition
                    // In both cases we can skip this event.
                    if (ws.latestPartitionStates.get(activityName).containsKey(partitionId)) {
                        continue;
                    }

                    // If we get here, we need to store the partition id with no state so that we know we've seen this case.
                    ws.latestPartitionStates.get(activityName).put(partitionId, null);
                }
            } else if (MARKER_EVENTS.contains(event.eventType())) {
                // There may be markers we don't care about in the history, such as the "unknown result code" marker that Flux adds,
                // or markers added by other tools.
                String markerName = event.markerRecordedEventAttributes().markerName();
                if (TaskNaming.isPartitionMetadataMarker(markerName)) {
                    String stepName = TaskNaming.extractPartitionMetadataMarkerStepName(markerName);
                    Long subsetId = TaskNaming.extractPartitionMetadataMarkerSubsetId(markerName);

                    partitionMarkerSubsetsFound.putIfAbsent(stepName, new HashSet<>());
                    partitionMarkerSubsetsFound.get(stepName).add(subsetId);

                    partitionMarkerSubsets.putIfAbsent(stepName, new LinkedList<>());
                    partitionMarkerSubsets.get(stepName).add(getMarkerData(event));

                    // If we've found all of the subsets, we can record the aggregate marker data.
                    // We can then clear out the temporary places we were storing it in since we don't need it anymore.
                    Long markerCount = TaskNaming.extractPartitionMetadataMarkerCount(markerName);
                    if (partitionMarkerSubsetsFound.get(stepName).size() == markerCount) {
                        // It's possible a marker was added more than once for some reason,
                        // so only insert if we don't already have marker data for this step.
                        ws.rawPartitionMetadata.putIfAbsent(stepName, partitionMarkerSubsets.get(stepName));

                        partitionMarkerSubsetsFound.remove(stepName);
                        partitionMarkerSubsets.remove(stepName);
                    }
                }
            } else if (ACTIVITY_CLOSED_EVENTS.contains(event.eventType())) {
                if (mostRecentClosedEvent == null) {
                    mostRecentClosedEvent = event;
                }
                closedEventsByScheduledEventId.put(getScheduledEventId(event), event);
            } else if (ACTIVITY_START_EVENTS.contains(event.eventType())) {
                if (mostRecentStartedEvent == null) {
                    mostRecentStartedEvent = event;
                }

                if (EventType.WORKFLOW_EXECUTION_STARTED.equals(event.eventType())) {
                    ws.workflowStartDate = event.eventTimestamp();
                    ws.workflowInput = getStepData(event);
                } else if (EventType.ACTIVITY_TASK_SCHEDULED.equals(event.eventType())) {
                    String activityName = getActivityName(event);
                    String partitionId = event.activityTaskScheduledEventAttributes().control();

                    // Since we're iterating over these in reverse order, we can just always overwrite whatever time is here.
                    ws.stepInitialAttemptTimes.put(activityName, event.eventTimestamp());

                    // if we already have newer state for this partition, don't bother creating an older PartitionState.
                    if (!ws.latestPartitionStates.containsKey(activityName)) {
                        ws.latestPartitionStates.put(activityName, new HashMap<>());
                    }
                    if (!ws.latestPartitionStates.get(activityName).containsKey(partitionId)) {
                        PartitionState partition = PartitionState.build(event, closedEventsByScheduledEventId.get(event.eventId()));
                        ws.latestPartitionStates.get(activityName).put(partitionId, partition);
                    }
                }
            } else if (TIMER_START_EVENTS.contains(event.eventType())) {
                // The timer will be in the closedTimersByStartedEventId map if it is already closed.
                if (!closedTimersByStartedEventId.containsKey(event.eventId())) {
                    TimerData timerData = new TimerData(event);
                    ws.openTimers.put(timerData.getTimerId(), timerData);
                }
            } else if (TIMER_CLOSED_EVENTS.contains(event.eventType())) {
                String timerId = getClosedTimerId(event);
                long startEventId = getTimerStartedEventId(event);
                closedTimersByStartedEventId.put(startEventId, timerId);
                // If the timer is in the open timers list, it's because it was reopened.
                // In that case, don't add it to the closed list.
                // If it's in the closed timers list already, then a later version of the timer was already fired;
                // in that case, we don't want to overwrite what's already in the closed list.
                if (!ws.openTimers.containsKey(timerId) && !ws.closedTimers.containsKey(timerId)) {
                    ws.closedTimers.put(timerId, event.eventId());
                }
            } else if (SIGNAL_EVENTS.contains(event.eventType())) {
                SignalType type = SignalType.fromFriendlyName(event.workflowExecutionSignaledEventAttributes().signalName());
                if (type != null) {
                    BaseSignalData signalData = SignalUtils.decodeSignal(event.workflowExecutionSignaledEventAttributes());
                    if (signalData != null) {
                        // since these events are in reverse-chronological order, we can guarantee we only keep the most recent
                        // event of each type by only saving this signal if we don't already have one.
                        if (!ws.signalsByActivityId.containsKey(signalData.getActivityId())) {
                            signalData.setSignalEventId(event.eventId());
                            signalData.setSignalEventTime(event.eventTimestamp());
                            ws.signalsByActivityId.put(signalData.getActivityId(), signalData);
                        }
                    }
                }
            } else if (WORKFLOW_CANCEL_REQUESTED_EVENTS.contains(event.eventType())) {
                // if more than one cancellation request was sent, we'll just use the most recent one
                if (ws.workflowCancelRequestDate == null) {
                    ws.workflowCancelRequestDate = event.eventTimestamp();
                }
            } else if (WORKFLOW_END_EVENTS.contains(event.eventType())) {
                ws.workflowExecutionClosed = true;
            }
        }

        if (mostRecentStartedEvent == null) {
            throw new BadWorkflowStateException("Unable to handle a workflow with no start event");
        } else if (ws.workflowStartDate == null) {
            throw new BadWorkflowStateException("Unable to handle a workflow with no WorkflowExecutionStarted event");
        }

        ws.currentActivityName = getActivityName(mostRecentStartedEvent);

        if (ws.currentActivityName != null) {
            Map<String, PartitionState> currentStepPartitions = ws.latestPartitionStates.get(ws.currentActivityName);
            if (currentStepPartitions.isEmpty()) {
                throw new BadWorkflowStateException("Found a workflow step with no history");
            }

            ws.currentStepFirstScheduledTime = null;
            ws.currentStepResultCode = StepResult.SUCCEED_RESULT_CODE;
            ws.currentStepCompletionTime = null;
            ws.currentStepLastActivityCompletionMessage = null;
            ws.currentStepMaxRetryCount = 0L;

            boolean hasPartitionNeedingRetry = false;

            for (Map.Entry<String, PartitionState> e : currentStepPartitions.entrySet()) {
                PartitionState lastState = e.getValue();
                if (lastState == null) {
                    // If we get here, then we tried to schedule this partition but we got a ScheduleActivityFailedEvent.
                    // In this case, we'll need to treat the partition as needing to be retried/rescheduled.
                    hasPartitionNeedingRetry = true;
                    continue;
                }

                if (lastState.getResultCode() != null) {
                    ws.currentStepLastActivityCompletionMessage
                            = lastState.getAttemptOutput().get(StepAttributes.ACTIVITY_COMPLETION_MESSAGE);
                    ws.currentStepCompletionTime = lastState.getAttemptCompletedTime();
                }

                ws.currentStepMaxRetryCount = Math.max(ws.currentStepMaxRetryCount, lastState.getRetryAttempt());

                ws.currentStepFirstScheduledTime = ws.stepInitialAttemptTimes.get(ws.currentActivityName);

                String stepName = TaskNaming.stepNameFromActivityName(ws.currentActivityName);
                String partitionId = e.getKey();
                String signalActivityId = TaskNaming.createActivityId(stepName, lastState.getRetryAttempt() + 1, partitionId);

                String effectiveResultCode = lastState.getResultCode();
                BaseSignalData signal = ws.signalsByActivityId.get(signalActivityId);
                if (signal != null && signal.getSignalType() == SignalType.FORCE_RESULT) {
                    effectiveResultCode = ((ForceResultSignalData)signal).getResultCode();
                    if (ws.currentStepCompletionTime == null
                            || ws.currentStepCompletionTime.isBefore(signal.getSignalEventTime())) {
                        ws.currentStepCompletionTime = signal.getSignalEventTime();
                    }
                    if (ws.currentStepLastActivityCompletionMessage == null) {
                        ws.currentStepLastActivityCompletionMessage = FORCED_RESULT_MESSAGE;
                    }
                }

                if (lastState.getPartitionCount() == 0) {
                    ws.currentStepResultCode = effectiveResultCode;
                } else {
                    // For partitioned steps, we need to be more selective about how we generate the current result code.
                    // If any partitions need to retry, we retry those partitions.
                    // If no partitions need to retry, and at least one partition failed, we use that result.
                    // Otherwise, we succeed.
                    if (effectiveResultCode != null
                            && !StepResult.FAIL_RESULT_CODE.equals(ws.currentStepResultCode)) {
                        ws.currentStepResultCode = effectiveResultCode;
                    } else if (effectiveResultCode == null) {
                        // If we don't have a resultCode for the partition, it means we need to retry it.
                        hasPartitionNeedingRetry = true;
                        break;
                    }
                }
            }

            if (hasPartitionNeedingRetry) {
                ws.currentStepResultCode = null;
                ws.currentStepCompletionTime = null;
            }
        }
        return ws;
    }