private static Decision handleWorkflowCompletion()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/poller/DecisionTaskPoller.java [841:967]


    private static Decision handleWorkflowCompletion(Workflow workflow, String workflowId, WorkflowState state,
                                                     MetricRecorder metrics) {
        String workflowName = TaskNaming.workflowName(workflow);
        String finalActivityName = state.getCurrentActivityName();

        // if we're completing a periodic workflow, we need to set a delayExit timer unless it has already fired
        boolean isPeriodicWorkflow = workflow.getClass().isAnnotationPresent(Periodic.class);
        boolean delayExitTimerHasFired = state.getClosedTimers().containsKey(DELAY_EXIT_TIMER_ID);
        if (isPeriodicWorkflow && !delayExitTimerHasFired) {

            // if the timer is still open we should do nothing here. Returning a null decision will do the trick.
            if (state.getOpenTimers().containsKey(DELAY_EXIT_TIMER_ID)) {
                log.debug("Processed a decision for {} but there was still an open timer, no action taken.", workflowId);
                return null;
            }

            Periodic periodicConfig = workflow.getClass().getAnnotation(Periodic.class);

            long runIntervalSeconds = periodicConfig.intervalUnits().toSeconds(periodicConfig.runInterval());
            Instant expectedWorkflowEnd = state.getWorkflowStartDate().plusSeconds(runIntervalSeconds);

            // we are probably going to delay exit, but for the purposes of completion time we want to exclude the delay.
            Duration executionDuration = Duration.between(state.getWorkflowStartDate(), state.getCurrentStepCompletionTime());
            metrics.addDuration(formatWorkflowCompletionTimeMetricName(workflowName), executionDuration);

            // we also want to emit the execution time for the last step, again excluding the delay.

            // for periodic workflows, we don't want to emit the completion time metric again when the workflow ends,
            // because it was already emitted once when the delayExit timer was scheduled.
            if (state.getCurrentActivityName() != null) {
                Duration stepCompletionTime = Duration.between(state.getCurrentStepFirstScheduledTime(),
                                                               state.getCurrentStepCompletionTime());
                metrics.addDuration(formatStepCompletionTimeMetricName(state.getCurrentActivityName()), stepCompletionTime);
                // +1 because we want to count the original attempt
                metrics.addCount(formatStepAttemptCountForCompletionMetricName(finalActivityName),
                                 state.getCurrentStepMaxRetryCount() + 1);
            }

            // Figure out how much time is left between now and the expected workflow end date
            Duration duration = Duration.between(Instant.now(), expectedWorkflowEnd);

            // always delay at least one second just so there's always a timer when we handle these workflows
            long delayInSeconds = Math.max(1, duration.getSeconds());

            StartTimerDecisionAttributes attrs = buildStartTimerDecisionAttrs(DELAY_EXIT_TIMER_ID, delayInSeconds, null);

            Decision decision = Decision.builder().decisionType(DecisionType.START_TIMER)
                                                  .startTimerDecisionAttributes(attrs)
                                                  .build();

            log.debug("Periodic Workflow {} will close after a delayed exit in {} seconds.", workflowId, delayInSeconds);
            return decision;
        }

        // At this point we know we don't need to set a delayExit timer
        Decision decision;

        // The workflow has ended, but we need to determine whether it succeeded or failed.
        final String resultCode = state.getCurrentStepResultCode();
        if (isPeriodicWorkflow) {
            // if this was a periodic workflow, we'll return a ContinueAsNew decision so that it restarts immediately.
            ContinueAsNewWorkflowExecutionDecisionAttributes attrs
                    = ContinueAsNewWorkflowExecutionDecisionAttributes.builder()
                        .childPolicy(ChildPolicy.TERMINATE)
                        // map values are already serialized in this workflow's original input
                        .input(StepAttributes.encode(state.getWorkflowInput()))
                        .taskStartToCloseTimeout(FluxCapacitorImpl.DEFAULT_DECISION_TASK_TIMEOUT)
                        .executionStartToCloseTimeout(Long.toString(workflow.maxStartToCloseDuration().getSeconds()))
                        .taskList(TaskList.builder().name(workflow.taskList()).build())
                        .build();

            decision = Decision.builder().decisionType(DecisionType.CONTINUE_AS_NEW_WORKFLOW_EXECUTION)
                                         .continueAsNewWorkflowExecutionDecisionAttributes(attrs)
                                         .build();
        } else if (state.isWorkflowCancelRequested()) {
            // If a workflow cancellation was requested, cancel the workflow
            CancelWorkflowExecutionDecisionAttributes attrs = CancelWorkflowExecutionDecisionAttributes.builder().build();
            decision = Decision.builder().decisionType(DecisionType.CANCEL_WORKFLOW_EXECUTION)
                                         .cancelWorkflowExecutionDecisionAttributes(attrs)
                                         .build();
        } else if (StepResult.FAIL_RESULT_CODE.equals(resultCode)) {
            // terminate the workflow as failed if the last step had a failed result.
            String reason = finalActivityName + " failed after " + state.getCurrentStepMaxRetryCount() + " attempts.";
            String details = state.getCurrentStepLastActivityCompletionMessage();
            if (details == null) {
                details = "No details were provided by the last activity: " + finalActivityName;
            }

            FailWorkflowExecutionDecisionAttributes attrs = FailWorkflowExecutionDecisionAttributes.builder()
                    .reason(reason).details(details).build();

            decision = Decision.builder().decisionType(DecisionType.FAIL_WORKFLOW_EXECUTION)
                                         .failWorkflowExecutionDecisionAttributes(attrs)
                                         .build();
        } else {
            CompleteWorkflowExecutionDecisionAttributes attrs = CompleteWorkflowExecutionDecisionAttributes.builder().build();
            decision = Decision.builder().decisionType(DecisionType.COMPLETE_WORKFLOW_EXECUTION)
                                         .completeWorkflowExecutionDecisionAttributes(attrs)
                                         .build();
        }

        // Periodic workflows have these metrics emitted *before* the delayExit fires, don't do it again in that case.
        if (!isPeriodicWorkflow) {
            Instant completionDate = state.getCurrentStepCompletionTime();
            if (state.isWorkflowCancelRequested()) {
                completionDate = state.getWorkflowCancelRequestDate();
            }
            Duration executionDuration = Duration.between(state.getWorkflowStartDate(), completionDate);
            metrics.addDuration(formatWorkflowCompletionTimeMetricName(workflowName), executionDuration);

            if (state.getCurrentActivityName() != null) {
                completionDate = state.getCurrentStepCompletionTime();
                if (completionDate == null && state.isWorkflowCancelRequested()) {
                    completionDate = state.getWorkflowCancelRequestDate();
                }
                Duration stepCompletionTime = Duration.between(state.getCurrentStepFirstScheduledTime(),
                                                               completionDate);
                metrics.addDuration(formatStepCompletionTimeMetricName(state.getCurrentActivityName()), stepCompletionTime);
                // +1 because we want to count the original attempt
                metrics.addCount(formatStepAttemptCountForCompletionMetricName(finalActivityName),
                                 state.getCurrentStepMaxRetryCount() + 1);
            }
        }

        log.debug("Workflow {} will be closed as successful.", workflowId);
        return decision;
    }