private static List handleStepScheduling()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [468:773]


    private static List<Decision> handleStepScheduling(Workflow workflow, String workflowId, WorkflowStep nextStep,
                                                       WorkflowState state, Map<String, String> nextStepInput,
                                                       double exponentialBackoffBase, MetricRecorder fluxMetrics,
                                                       MetricRecorderFactory metricsFactory) throws JsonProcessingException {
        List<Decision> decisions = new LinkedList<>();

        String workflowName = TaskNaming.workflowName(workflow);
        String nextStepName = TaskNaming.stepName(nextStep);
        String nextActivityName = TaskNaming.activityName(workflowName, nextStepName);
        boolean nextStepIsPartitioned = PartitionedWorkflowStep.class.isAssignableFrom(nextStep.getClass());

        boolean generatePartitionMetadata = false;

        PartitionMetadata nextStepPartitionMetadata = state.getPartitionMetadata(nextStepName);

        // If we already have state for the step, we are probably retrying. Otherwise, it's a new step.
        if (!state.getLatestPartitionStates(nextActivityName).isEmpty()) {
            // Here we need to decide whether to generate partition metadata.

            // First we need to check if we already have partition metadata (from the marker) for this partition.
            // If so, we don't need to do anything here.

            // Otherwise, we need to check if we have partial state. This can happen if this is a partitioned step and SWF throttled
            // activity scheduling for some or all of the partitions.
            if (nextStepIsPartitioned && nextStepPartitionMetadata == null) {
                // Here we need to check each partition id to figure out two things:
                // 1) Do we have non-empty state for at least one partition?
                // 2) Do we have empty state for at least one partition?
                // We'll only have empty state for a partition whose first attempt was never successfully scheduled.

                boolean hasNonEmptyState = false;
                boolean hasEmptyState = false;

                for (PartitionState partition : state.getLatestPartitionStates(nextActivityName).values()) {
                    if (partition == null) {
                        hasEmptyState = true;
                    } else {
                        hasNonEmptyState = true;
                    }
                }

                // If we have empty state for all partitions, we can proceed to generate partition metadata.
                if (hasEmptyState && !hasNonEmptyState) {
                    generatePartitionMetadata = true;
                }
            }
        } else if (nextStepIsPartitioned) {
            // In this case, it's a new partitioned step. If we don't already have partition metadata,
            // we need to generate it.
            if (nextStepPartitionMetadata == null) {
                generatePartitionMetadata = true;
            }
        }

        if (generatePartitionMetadata) {
            // If we need to generate partition ids, then we need to return two decisions.
            // First, we need to record a marker containing the partition metadata.
            // Second, like when we cancel a retry timer, we need to force another decision to occur.
            // We do them separately to reduce the likelihood that throttling of activity scheduling affects recording the marker,
            // and (less importantly) because the marker will use up some of the 1MB of data we can send back in the decision
            // response, which may reduce the number of partitions we could schedule.
            PartitionIdGeneratorResult result
                    = WorkflowStepUtil.getPartitionIdsForPartitionedStep((PartitionedWorkflowStep)nextStep,
                                                                         nextStepInput, workflowName,
                                                                         workflowId, metricsFactory);
            PartitionMetadata metadata = PartitionMetadata.fromPartitionIdGeneratorResult(result);

            List<String> markerDetailsList = metadata.toMarkerDetailsList();
            for (int i = 0; i < markerDetailsList.size(); i++) {
                String markerDetails = markerDetailsList.get(i);
                RecordMarkerDecisionAttributes markerAttrs = RecordMarkerDecisionAttributes.builder()
                        .markerName(TaskNaming.partitionMetadataMarkerName(nextStepName, i, markerDetailsList.size()))
                        .details(markerDetails)
                        .build();
                Decision marker = Decision.builder().decisionType(DecisionType.RECORD_MARKER)
                        .recordMarkerDecisionAttributes(markerAttrs)
                        .build();
                decisions.add(marker);
            }

            decisions.add(decisionToForceNewDecision());

            // As noted above, we don't want to schedule the partitions at the same time, so we'll return immediately.
            return decisions;
        }

        // Now we need to extract the set of partition IDs.
        Set<String> partitionIds;
        if (nextStepIsPartitioned) {
            if (nextStepPartitionMetadata != null) {
                partitionIds = nextStepPartitionMetadata.getPartitionIds();
            } else {
                // In this case, we don't have partition metadata but we didn't decide to generate partition metadata.
                // We'll need to extract the partition IDs from the set of partitions we have state for.
                // This should only happen during deployment of this change.
                partitionIds = state.getLatestPartitionStates(nextActivityName).keySet();
            }
        } else {
            partitionIds = Collections.singleton(null);
        }

        // For partitioned steps, there are a few edge cases to handle.
        //
        // 1) If we have a partition metadata marker, we're not in the edge case.
        // 2) If all partitions had at least one successfully-scheduled attempt, then we proceed without the marker.
        //    This handles backward-compatibility during deployment of this change in the absence of throttling.
        // 3) If SWF has rejected the first attempt to schedule at least one partition, and zero partitions
        //    have been successfully scheduled, we can proceed to generate the metadata marker.
        //    This handles backward-compatibility during deployment of this change if throttling is occurring to all partitions.
        //    This case is checked for above, so if we get to this point in the code we actually hit case 1.
        // 4) If SWF has rejected the first attempt to schedule at least one partition, but has successfully scheduled
        //    at least one attempt for at least one other partition, then we can proceed without the marker,
        //    but we need to extract any additional attributes from the input to one of the successfully-scheduled partitions.
        //
        //  We could just abandon the workflow if we hit case 4, but it is better to allow users to force the workflow
        //  into a rollback or recovery path in the workflow graph if possible, rather than requiring them to terminate
        //  the workflow in all cases.

        // If we have partition metadata, we should put its additional attributes into the next step input.
        if (nextStepPartitionMetadata != null) {
            nextStepInput.putAll(nextStepPartitionMetadata.getEncodedAdditionalAttributes());
        } else if (nextStepIsPartitioned) {
            // If we get here, we need to extract the additional attributes from one of the successfully-scheduled partitions.
            // Note that we can't get here if there aren't any successfully-scheduled partitions, since we would have
            // generated partition metadata and returned from this method earlier.
            // In other words, if we got here, then we don't have a partition metadata marker,
            // and there was at least one successfully-scheduled partition, which means the .orElse() here can't trigger.
            PartitionState lastState = state.getLatestPartitionStates(nextActivityName)
                    .values().stream().filter(Objects::nonNull).findFirst().orElse(null);

            for (Map.Entry<String, String> e : lastState.getAttemptInput().entrySet()) {
                nextStepInput.putIfAbsent(e.getKey(), e.getValue());
            }

            // We'll need to remove the partition-specific inputs
            nextStepInput.remove(StepAttributes.PARTITION_ID);
            nextStepInput.remove(StepAttributes.RETRY_ATTEMPT);
        }

        for (String partitionId : partitionIds) {
            PartitionState lastAttempt = state.getLatestPartitionStates(nextActivityName).get(partitionId);

            if (lastAttempt != null && lastAttempt.getResultCode() != null) {
                // Non-partitioned steps should not get here.
                if (!nextStepIsPartitioned) {
                    String msg = String.format("We cannot reschedule a non-partitioned step %s.%s that already has a result %s.",
                                               workflowName, nextActivityName, lastAttempt.getResultCode());
                    log.error(msg);
                    throw new BadWorkflowStateException(msg);
                }
                log.info("Workflow {} step {}.{} partition {} already completed ({}), not rescheduling.",
                         workflowId, workflowName, nextActivityName, partitionId, lastAttempt.getResultCode());
                continue;
            }

            boolean retrying = false;
            long attemptNumber = 0L;
            if (lastAttempt != null && lastAttempt.getAttemptResult() == null) {
                // The last attempt was already scheduled (possibly even started) but hasn't finished yet.
                // Just move on to the next partition.
                continue;
            } else if (lastAttempt != null) {
                // This is a retry.
                // We need to check for a timer for this particular retry.
                String encoded = lastAttempt.getAttemptInput().get(StepAttributes.RETRY_ATTEMPT);
                if (encoded != null) {
                    attemptNumber = StepAttributes.decode(Long.class, encoded);
                }
                // We're going to schedule the next attempt, so we need to bump the attempt number.
                attemptNumber += 1L;
                retrying = true;
            } else { // lastAttempt == null
                // We can get here if we tried to schedule the first attempt of a step
                // but SWF gave us a ScheduleActivityTaskFailed event.
                // In this case we need to try again.
                retrying = true;
            }

            String activityId = TaskNaming.createActivityId(nextStep, attemptNumber, partitionId);
            boolean hasForceResultSignal = false;
            if (state.getSignalsByActivityId().containsKey(activityId)
                    && state.getSignalsByActivityId().get(activityId).getSignalType() == SignalType.FORCE_RESULT) {
                hasForceResultSignal = true;
                retrying = false;
            }

            // Now we need to make a decision for this attempt.
            // If it's not the first attempt, we need to check timers for this activityId.
            // - If a timer is open, there's nothing to do yet unless we got a RetryNow or DelayRetry signal,
            //   or the workflow was canceled.
            // - If no timer is open and no timer has fired, add a StartTimer decision.
            // - Otherwise, schedule the next attempt.
            if (attemptNumber > 0) {
                // this is a retry if we get in here.
                BaseSignalData signal = state.getSignalsByActivityId().get(activityId);
                if (state.getOpenTimers().containsKey(activityId)) {
                    if (signal != null) {
                        // The retry timer id matches the next attempt's activity id, which matches the signal's activity id.
                        // ScheduleDelayedRetry events need to be ignored if the timer is still open.

                        // first check if the signal is older than the scheduled event for the retry timer, if so ignore it.
                        TimerData openTimer = state.getOpenTimers().get(activityId);
                        if (signal.getSignalEventId() > openTimer.getStartTimerEventId()) {
                            decisions.addAll(handleSignal(state, signal, partitionId, nextActivityName, fluxMetrics));
                        }
                        // If the signal was not ForceResult, we don't want to make any more decisions for this partition.
                        // If it was ForceResult, we may or may not need to schedule the next step, we need to let that code decide.
                        if (signal.getSignalType() != SignalType.FORCE_RESULT) {
                            continue;
                        }
                    } else {
                        log.debug("Processed a decision for workflow {} activity {} but there was still an open timer,"
                                  + " no action taken.", workflowId, activityId);
                        // continue to the next partition, we don't want to make any more decisions for this one
                        continue;
                    }
                } else if (signal != null) { // if there isn't an open timer but we have a ScheduleDelayedRetry signal
                    if (signal.getSignalType() == SignalType.SCHEDULE_DELAYED_RETRY) {
                        // first check if the signal is older than the close event for the last retry timer, if so ignore it.
                        if (!state.getClosedTimers().containsKey(activityId)
                                || signal.getSignalEventId() > state.getClosedTimers().get(activityId)) {
                            decisions.addAll(handleSignal(state, signal, partitionId, nextActivityName, fluxMetrics));
                            // continue to the next partition, we don't want to make any more decisions for this one
                            continue;
                        }
                    } else {
                        log.debug("Processed a decision for workflow {} activity {} but there was no open timer,"
                                  + " no action taken.", workflowId, activityId);
                        // no continue here, we may want to schedule a normal retry timer, or the next step.
                    }
                }

                // check whether we've already closed the retry timer. If not, start it.
                if (!state.getClosedTimers().containsKey(activityId) && !state.getOpenTimers().containsKey(activityId)) {
                    long delayInSeconds = RetryUtils.calculateRetryBackoffInSeconds(nextStep, attemptNumber,
                                                                                    exponentialBackoffBase);
                    StartTimerDecisionAttributes attrs = buildStartTimerDecisionAttrs(activityId, delayInSeconds, partitionId);

                    Decision decision = Decision.builder().decisionType(DecisionType.START_TIMER)
                                                          .startTimerDecisionAttributes(attrs)
                                                          .build();

                    log.debug("Workflow {} will have activity {} scheduled after a delay of {} seconds.",
                              workflowId, activityId, delayInSeconds);

                    decisions.add(decision);
                    // continue to the next partition, we don't want to make any more decisions for this one
                    continue;
                }
            } else if (!retrying && state.getCurrentActivityName() != null) {
                // We're about to schedule the first attempt of the next step.
                // We need to check if the previous step ended due to a ForceResult signal; if so,
                // we may need to cancel its retry timer. We don't really know which partition it might have been,
                // so it's easiest to just check for open timers for any partition of the previous step and cancel them.
                Map<String, PartitionState> prevStep = state.getLatestPartitionStates(state.getCurrentActivityName());
                for (Map.Entry<String, PartitionState> prevPartition : prevStep.entrySet()) {
                    PartitionState prevState = prevPartition.getValue();

                    String currentStepName = TaskNaming.stepNameFromActivityName(state.getCurrentActivityName());
                    String prevActivityId = TaskNaming.createActivityId(currentStepName, prevState.getRetryAttempt() + 1,
                                                                        prevPartition.getKey());
                    if (state.getOpenTimers().containsKey(prevActivityId)) {
                        decisions.add(buildCancelTimerDecision(prevActivityId));
                    }
                }
            }

            // if we got a ForceResult signal for this partition attempt, we've probably just cancelled the timer,
            // so we don't want to schedule a retry for it.
            if (hasForceResultSignal) {
                continue;
            }

            // If we get this far, we know we're scheduling a run of nextStep.
            // First let's populate the attempt-specific fields into the next step input map.
            Map<String, String> actualInput = new TreeMap<>(nextStepInput);
            if (attemptNumber > 0L) {
                actualInput.put(StepAttributes.RETRY_ATTEMPT, Long.toString(attemptNumber));
            }
            if (partitionId != null) {
                actualInput.put(StepAttributes.PARTITION_ID, StepAttributes.encode(partitionId));
                actualInput.put(StepAttributes.PARTITION_COUNT, Long.toString(partitionIds.size()));
            }
            Instant firstAttemptDate = state.getStepInitialAttemptTime(nextActivityName);
            if (firstAttemptDate == null) {
                firstAttemptDate = Instant.now();
            }
            actualInput.put(StepAttributes.ACTIVITY_INITIAL_ATTEMPT_TIME, StepAttributes.encode(firstAttemptDate));

            ScheduleActivityTaskDecisionAttributes attrs
                    = buildScheduleActivityTaskDecisionAttrs(workflow, nextStep, actualInput, activityId);

            // We'll save the partition id in the control field for convenience in debugging and testing,
            // and to reference when rebuilding partition state to avoid having to inspect the step's input attributes.
            attrs = attrs.toBuilder().control(partitionId).build();

            Decision decision = Decision.builder().decisionType(DecisionType.SCHEDULE_ACTIVITY_TASK)
                                                  .scheduleActivityTaskDecisionAttributes(attrs)
                                                  .build();
            decisions.add(decision);

            log.debug("Workflow {} will have activity {} scheduled for execution.", workflowId, activityId);

        }
        return decisions;
    }