in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [320:454]
static RespondDecisionTaskCompletedRequest decide(Workflow workflow, WorkflowStep currentStep, String workflowId,
WorkflowState state, double exponentialBackoffBase,
MetricRecorder metrics, MetricRecorderFactory metricsFactory)
throws JsonProcessingException {
String workflowName = TaskNaming.workflowName(workflow);
String activityName = (currentStep == null ? null : TaskNaming.activityName(workflowName, currentStep));
metrics.addProperty(WORKFLOW_ID_METRIC_NAME, workflowId);
metrics.addProperty(WORKFLOW_RUN_ID_METRIC_NAME, state.getWorkflowRunId());
List<Decision> decisions = new ArrayList<>();
String currentStepResultCode = state.getCurrentStepResultCode();
Map<String, String> nextStepInput = null;
if (currentStep == null) {
// the workflow has just started, so the next step's input is the workflow input
nextStepInput = new HashMap<>(state.getWorkflowInput());
// Add some workflow attributes to be available to all steps/hooks/etc
nextStepInput.put(StepAttributes.WORKFLOW_ID, StepAttributes.encode(state.getWorkflowId()));
nextStepInput.put(StepAttributes.WORKFLOW_EXECUTION_ID, StepAttributes.encode(state.getWorkflowRunId()));
nextStepInput.put(StepAttributes.WORKFLOW_START_TIME,
StepAttributes.encode(state.getWorkflowStartDate()));
} else {
// build the next step's input based on the previous step's input and output
Map<String, PartitionState> currentStepPartitions = state.getLatestPartitionStates(activityName);
if (currentStepPartitions != null) {
// Here we just want any arbitrary partition's state, we're just trying to determine the next step's input.
// We need to make sure we only consider partitions that have state; there is an edge case where
// we can know about a partition but not have any state for it if SWF failed the first schedule attempt.
// Note that the .orElse() here can't trigger since we are working against the most recently executed step,
// for which we always have state.
// If we didn't have state, then the workflow just started and currentStep would be null, which is handled above.
PartitionState partition = currentStepPartitions.values().stream()
.filter(Objects::nonNull).findFirst().orElse(null);
nextStepInput = new HashMap<>(partition.getAttemptInput());
boolean currentStepIsPartitioned = PartitionedWorkflowStep.class.isAssignableFrom(currentStep.getClass());
if (!currentStepIsPartitioned) {
// Non-partitioned steps can pass along their output to subsequent steps.
// Retry count and result code will be stripped out below.
nextStepInput.putAll(partition.getAttemptOutput());
}
// Strip out fields that are specific to each attempt, they will be populated below as needed.
nextStepInput.remove(StepAttributes.PARTITION_ID);
nextStepInput.remove(StepAttributes.PARTITION_COUNT);
nextStepInput.remove(StepAttributes.RETRY_ATTEMPT);
nextStepInput.remove(StepAttributes.RESULT_CODE);
nextStepInput.remove(StepAttributes.ACTIVITY_COMPLETION_MESSAGE);
}
}
NextStepSelection selection = findNextStep(workflow, currentStep, currentStepResultCode);
String nextStepNameForContext = null;
Map<String, String> contextAttributes = new HashMap<>();
if (selection.workflowShouldClose() || state.isWorkflowCancelRequested()) {
// If workflow cancellation was requested, we need to check whether any individual tasks need to be canceled.
if (state.isWorkflowCancelRequested()) {
for (PartitionState partition : state.getLatestPartitionStates(activityName).values()) {
// For each in-flight partition missing an attempt result (meaning, a worker is actively working it),
// request cancellation of the activity task.
if (partition != null && partition.getAttemptResult() == null) {
decisions.add(buildRequestCancelActivityTaskDecision(partition.getActivityId()));
}
}
// Also cancel any open retry timers.
for (String timerId : state.getOpenTimers().keySet()) {
decisions.add(buildCancelTimerDecision(timerId));
}
}
Decision decision = handleWorkflowCompletion(workflow, workflowId, state, metrics);
if (decision != null) {
decisions.add(decision);
}
// if the workflow is ending and this is a periodic workflow, we set the 'next step' field in the execution context
// to _delayExit to help indicate the reason that the workflow execution hasn't actually closed yet.
if (workflow.getClass().isAnnotationPresent(Periodic.class)) {
nextStepNameForContext = DELAY_EXIT_TIMER_ID;
}
} else if (selection.isNextStepUnknown()) {
// currentStep can't be null if the next step is unknown, since if the current step is null,
// we _always_ schedule the first step of the workflow.
nextStepNameForContext = currentStep.getClass().getSimpleName();
WorkflowGraphNode nextNode = workflow.getGraph().getNodes().get(currentStep.getClass());
Map<String, String> resultCodeMap = getResultCodeMapForContext(nextNode);
contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_RESULT_CODES, StepAttributes.encode(resultCodeMap));
decisions.addAll(handleUnknownResultCode(workflow, currentStep, currentStepResultCode, state,
resultCodeMap.keySet(), metrics));
} else {
if (currentStep != null && currentStep != selection.getNextStep()) {
Duration stepDuration = Duration.between(state.getCurrentStepFirstScheduledTime(),
state.getCurrentStepCompletionTime());
metrics.addDuration(formatStepCompletionTimeMetricName(activityName), stepDuration);
// retryAttempt+1 because if we didn't retry at all, retryAttempt will be 0, and we want total count
metrics.addCount(formatStepAttemptCountForCompletionMetricName(activityName),
state.getCurrentStepMaxRetryCount() + 1);
}
// It's possible we got here after e.g. a CancelWorkflowExecution decision has been made, or if the workflow
// has otherwise ended. In that case, just bail without making any decisions.
if (state.isWorkflowExecutionClosed()) {
log.warn("Workflow {} is already closed, so no new tasks can be scheduled.", workflowId);
} else {
decisions.addAll(handleStepScheduling(workflow, workflowId, selection.getNextStep(), state, nextStepInput,
exponentialBackoffBase, metrics, metricsFactory));
nextStepNameForContext = selection.getNextStep().getClass().getSimpleName();
WorkflowGraphNode nextNode = workflow.getGraph().getNodes().get(selection.getNextStep().getClass());
Map<String, String> resultCodeMap = getResultCodeMapForContext(nextNode);
contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_RESULT_CODES, StepAttributes.encode(resultCodeMap));
}
}
if (nextStepNameForContext != null) {
contextAttributes.put(EXECUTION_CONTEXT_NEXT_STEP_NAME, StepAttributes.encode(nextStepNameForContext));
}
String executionContext = null;
if (!contextAttributes.isEmpty()) {
executionContext = StepAttributes.encode(contextAttributes);
}
return RespondDecisionTaskCompletedRequest.builder().decisions(decisions).executionContext(executionContext).build();
}