static RespondDecisionTaskCompletedRequest decide()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [320:454]


    static RespondDecisionTaskCompletedRequest decide(Workflow workflow, WorkflowStep currentStep, String workflowId,
                                                      WorkflowState state, double exponentialBackoffBase,
                                                      MetricRecorder metrics, MetricRecorderFactory metricsFactory)
            throws JsonProcessingException {
        String workflowName = TaskNaming.workflowName(workflow);
        String activityName = (currentStep == null ? null : TaskNaming.activityName(workflowName, currentStep));

        metrics.addProperty(WORKFLOW_ID_METRIC_NAME, workflowId);
        metrics.addProperty(WORKFLOW_RUN_ID_METRIC_NAME, state.getWorkflowRunId());

        List<Decision> decisions = new ArrayList<>();

        String currentStepResultCode = state.getCurrentStepResultCode();

        Map<String, String> nextStepInput = null;

        if (currentStep == null) {
            // the workflow has just started, so the next step's input is the workflow input
            nextStepInput = new HashMap<>(state.getWorkflowInput());
            // Add some workflow attributes to be available to all steps/hooks/etc
            nextStepInput.put(StepAttributes.WORKFLOW_ID, StepAttributes.encode(state.getWorkflowId()));
            nextStepInput.put(StepAttributes.WORKFLOW_EXECUTION_ID, StepAttributes.encode(state.getWorkflowRunId()));
            nextStepInput.put(StepAttributes.WORKFLOW_START_TIME,
                              StepAttributes.encode(state.getWorkflowStartDate()));
        } else {
            // build the next step's input based on the previous step's input and output
            Map<String, PartitionState> currentStepPartitions = state.getLatestPartitionStates(activityName);
            if (currentStepPartitions != null) {
                // Here we just want any arbitrary partition's state, we're just trying to determine the next step's input.
                // We need to make sure we only consider partitions that have state; there is an edge case where
                // we can know about a partition but not have any state for it if SWF failed the first schedule attempt.
                // Note that the .orElse() here can't trigger since we are working against the most recently executed step,
                // for which we always have state.
                // If we didn't have state, then the workflow just started and currentStep would be null, which is handled above.
                PartitionState partition = currentStepPartitions.values().stream()
                                                                .filter(Objects::nonNull).findFirst().orElse(null);

                nextStepInput = new HashMap<>(partition.getAttemptInput());

                boolean currentStepIsPartitioned = PartitionedWorkflowStep.class.isAssignableFrom(currentStep.getClass());
                if (!currentStepIsPartitioned) {
                    // Non-partitioned steps can pass along their output to subsequent steps.
                    // Retry count and result code will be stripped out below.
                    nextStepInput.putAll(partition.getAttemptOutput());
                }

                // Strip out fields that are specific to each attempt, they will be populated below as needed.
                nextStepInput.remove(StepAttributes.PARTITION_ID);
                nextStepInput.remove(StepAttributes.PARTITION_COUNT);
                nextStepInput.remove(StepAttributes.RETRY_ATTEMPT);
                nextStepInput.remove(StepAttributes.RESULT_CODE);
                nextStepInput.remove(StepAttributes.ACTIVITY_COMPLETION_MESSAGE);
            }
        }

        NextStepSelection selection = findNextStep(workflow, currentStep, currentStepResultCode);
        String nextStepNameForContext = null;
        Map<String, String> contextAttributes = new HashMap<>();
        if (selection.workflowShouldClose() || state.isWorkflowCancelRequested()) {

            // If workflow cancellation was requested, we need to check whether any individual tasks need to be canceled.
            if (state.isWorkflowCancelRequested()) {
                for (PartitionState partition : state.getLatestPartitionStates(activityName).values()) {
                    // For each in-flight partition missing an attempt result (meaning, a worker is actively working it),
                    // request cancellation of the activity task.
                    if (partition != null && partition.getAttemptResult() == null) {
                        decisions.add(buildRequestCancelActivityTaskDecision(partition.getActivityId()));
                    }
                }

                // Also cancel any open retry timers.
                for (String timerId : state.getOpenTimers().keySet()) {
                    decisions.add(buildCancelTimerDecision(timerId));
                }
            }


            Decision decision = handleWorkflowCompletion(workflow, workflowId, state, metrics);
            if (decision != null) {
                decisions.add(decision);
            }

            // if the workflow is ending and this is a periodic workflow, we set the 'next step' field in the execution context
            // to _delayExit to help indicate the reason that the workflow execution hasn't actually closed yet.
            if (workflow.getClass().isAnnotationPresent(Periodic.class)) {
                nextStepNameForContext = DELAY_EXIT_TIMER_ID;
            }
        } else if (selection.isNextStepUnknown()) {
            // currentStep can't be null if the next step is unknown, since if the current step is null,
            // we _always_ schedule the first step of the workflow.
            nextStepNameForContext = currentStep.getClass().getSimpleName();
            WorkflowGraphNode nextNode = workflow.getGraph().getNodes().get(currentStep.getClass());
            Map<String, String> resultCodeMap = getResultCodeMapForContext(nextNode);
            contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_RESULT_CODES, StepAttributes.encode(resultCodeMap));

            decisions.addAll(handleUnknownResultCode(workflow, currentStep, currentStepResultCode, state,
                                                     resultCodeMap.keySet(), metrics));
        } else {
            if (currentStep != null && currentStep != selection.getNextStep()) {
                Duration stepDuration = Duration.between(state.getCurrentStepFirstScheduledTime(),
                                                         state.getCurrentStepCompletionTime());
                metrics.addDuration(formatStepCompletionTimeMetricName(activityName), stepDuration);
                // retryAttempt+1 because if we didn't retry at all, retryAttempt will be 0, and we want total count
                metrics.addCount(formatStepAttemptCountForCompletionMetricName(activityName),
                                 state.getCurrentStepMaxRetryCount() + 1);
            }

            // It's possible we got here after e.g. a CancelWorkflowExecution decision has been made, or if the workflow
            // has otherwise ended. In that case, just bail without making any decisions.
            if (state.isWorkflowExecutionClosed()) {
                log.warn("Workflow {} is already closed, so no new tasks can be scheduled.", workflowId);
            } else {
                decisions.addAll(handleStepScheduling(workflow, workflowId, selection.getNextStep(), state, nextStepInput,
                                                      exponentialBackoffBase, metrics, metricsFactory));

                nextStepNameForContext = selection.getNextStep().getClass().getSimpleName();

                WorkflowGraphNode nextNode = workflow.getGraph().getNodes().get(selection.getNextStep().getClass());
                Map<String, String> resultCodeMap = getResultCodeMapForContext(nextNode);
                contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_RESULT_CODES, StepAttributes.encode(resultCodeMap));

            }
        }

        if (nextStepNameForContext != null) {
            contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_NAME, StepAttributes.encode(nextStepNameForContext));
        }

        String executionContext = null;
        if (!contextAttributes.isEmpty()) {
            executionContext = StepAttributes.encode(contextAttributes);
        }

        return RespondDecisionTaskCompletedRequest.builder().decisions(decisions).executionContext(executionContext).build();
    }