in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [841:967]
private static Decision handleWorkflowCompletion(Workflow workflow, String workflowId, WorkflowState state,
MetricRecorder metrics) {
String workflowName = TaskNaming.workflowName(workflow);
String finalActivityName = state.getCurrentActivityName();
// if we're completing a periodic workflow, we need to set a delayExit timer unless it has already fired
boolean isPeriodicWorkflow = workflow.getClass().isAnnotationPresent(Periodic.class);
boolean delayExitTimerHasFired = state.getClosedTimers().containsKey(DELAY_EXIT_TIMER_ID);
if (isPeriodicWorkflow && !delayExitTimerHasFired) {
// if the timer is still open we should do nothing here. Returning a null decision will do the trick.
if (state.getOpenTimers().containsKey(DELAY_EXIT_TIMER_ID)) {
log.debug("Processed a decision for {} but there was still an open timer, no action taken.", workflowId);
return null;
}
Periodic periodicConfig = workflow.getClass().getAnnotation(Periodic.class);
long runIntervalSeconds = periodicConfig.intervalUnits().toSeconds(periodicConfig.runInterval());
Instant expectedWorkflowEnd = state.getWorkflowStartDate().plusSeconds(runIntervalSeconds);
// we are probably going to delay exit, but for the purposes of completion time we want to exclude the delay.
Duration executionDuration = Duration.between(state.getWorkflowStartDate(), state.getCurrentStepCompletionTime());
metrics.addDuration(formatWorkflowCompletionTimeMetricName(workflowName), executionDuration);
// we also want to emit the execution time for the last step, again excluding the delay.
// for periodic workflows, we don't want to emit the completion time metric again when the workflow ends,
// because it was already emitted once when the delayExit timer was scheduled.
if (state.getCurrentActivityName() != null) {
Duration stepCompletionTime = Duration.between(state.getCurrentStepFirstScheduledTime(),
state.getCurrentStepCompletionTime());
metrics.addDuration(formatStepCompletionTimeMetricName(state.getCurrentActivityName()), stepCompletionTime);
// +1 because we want to count the original attempt
metrics.addCount(formatStepAttemptCountForCompletionMetricName(finalActivityName),
state.getCurrentStepMaxRetryCount() + 1);
}
// Figure out how much time is left between now and the expected workflow end date
Duration duration = Duration.between(Instant.now(), expectedWorkflowEnd);
// always delay at least one second just so there's always a timer when we handle these workflows
long delayInSeconds = Math.max(1, duration.getSeconds());
StartTimerDecisionAttributes attrs = buildStartTimerDecisionAttrs(DELAY_EXIT_TIMER_ID, delayInSeconds, null);
Decision decision = Decision.builder().decisionType(DecisionType.START_TIMER)
.startTimerDecisionAttributes(attrs)
.build();
log.debug("Periodic Workflow {} will close after a delayed exit in {} seconds.", workflowId, delayInSeconds);
return decision;
}
// At this point we know we don't need to set a delayExit timer
Decision decision;
// The workflow has ended, but we need to determine whether it succeeded or failed.
final String resultCode = state.getCurrentStepResultCode();
if (isPeriodicWorkflow) {
// if this was a periodic workflow, we'll return a ContinueAsNew decision so that it restarts immediately.
ContinueAsNewWorkflowExecutionDecisionAttributes attrs
= ContinueAsNewWorkflowExecutionDecisionAttributes.builder()
.childPolicy(ChildPolicy.TERMINATE)
// map values are already serialized in this workflow's original input
.input(StepAttributes.encode(state.getWorkflowInput()))
.taskStartToCloseTimeout(FluxCapacitorImpl.DEFAULT_DECISION_TASK_TIMEOUT)
.executionStartToCloseTimeout(Long.toString(workflow.maxStartToCloseDuration().getSeconds()))
.taskList(TaskList.builder().name(workflow.taskList()).build())
.build();
decision = Decision.builder().decisionType(DecisionType.CONTINUE_AS_NEW_WORKFLOW_EXECUTION)
.continueAsNewWorkflowExecutionDecisionAttributes(attrs)
.build();
} else if (state.isWorkflowCancelRequested()) {
// If a workflow cancellation was requested, cancel the workflow
CancelWorkflowExecutionDecisionAttributes attrs = CancelWorkflowExecutionDecisionAttributes.builder().build();
decision = Decision.builder().decisionType(DecisionType.CANCEL_WORKFLOW_EXECUTION)
.cancelWorkflowExecutionDecisionAttributes(attrs)
.build();
} else if (StepResult.FAIL_RESULT_CODE.equals(resultCode)) {
// terminate the workflow as failed if the last step had a failed result.
String reason = finalActivityName + " failed after " + state.getCurrentStepMaxRetryCount() + " attempts.";
String details = state.getCurrentStepLastActivityCompletionMessage();
if (details == null) {
details = "No details were provided by the last activity: " + finalActivityName;
}
FailWorkflowExecutionDecisionAttributes attrs = FailWorkflowExecutionDecisionAttributes.builder()
.reason(reason).details(details).build();
decision = Decision.builder().decisionType(DecisionType.FAIL_WORKFLOW_EXECUTION)
.failWorkflowExecutionDecisionAttributes(attrs)
.build();
} else {
CompleteWorkflowExecutionDecisionAttributes attrs = CompleteWorkflowExecutionDecisionAttributes.builder().build();
decision = Decision.builder().decisionType(DecisionType.COMPLETE_WORKFLOW_EXECUTION)
.completeWorkflowExecutionDecisionAttributes(attrs)
.build();
}
// Periodic workflows have these metrics emitted *before* the delayExit fires, don't do it again in that case.
if (!isPeriodicWorkflow) {
Instant completionDate = state.getCurrentStepCompletionTime();
if (state.isWorkflowCancelRequested()) {
completionDate = state.getWorkflowCancelRequestDate();
}
Duration executionDuration = Duration.between(state.getWorkflowStartDate(), completionDate);
metrics.addDuration(formatWorkflowCompletionTimeMetricName(workflowName), executionDuration);
if (state.getCurrentActivityName() != null) {
completionDate = state.getCurrentStepCompletionTime();
if (completionDate == null && state.isWorkflowCancelRequested()) {
completionDate = state.getWorkflowCancelRequestDate();
}
Duration stepCompletionTime = Duration.between(state.getCurrentStepFirstScheduledTime(),
completionDate);
metrics.addDuration(formatStepCompletionTimeMetricName(state.getCurrentActivityName()), stepCompletionTime);
// +1 because we want to count the original attempt
metrics.addCount(formatStepAttemptCountForCompletionMetricName(finalActivityName),
state.getCurrentStepMaxRetryCount() + 1);
}
}
log.debug("Workflow {} will be closed as successful.", workflowId);
return decision;
}