public void execute()

in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/DecisionTaskPoller.java [300:382]


    public void execute(final DecisionTaskIterator tasks) throws Exception {
        RespondDecisionTaskCompletedRequest taskCompletedRequest = null;
        final Metrics metrics = metricsRegistry.newMetrics(MetricName.Operation.EXECUTE_DECISION_TASK.getName());
        final PollForDecisionTaskResponse firstTask = tasks.getFirstDecisionTask();
        ThreadLocalMetrics.setCurrent(metrics);
        MetricHelper.recordMetrics(firstTask, metrics);
        boolean decisionSubmitted = false;
        boolean establishAffinity = shouldEstablishAffinity(firstTask);
        try {
            HandleDecisionTaskResults handleDecisionTaskResults = decisionTaskHandler.handleDecisionTask(tasks);
            taskCompletedRequest = handleDecisionTaskResults.getRespondDecisionTaskCompletedRequest();
            if (decisionsLog.isTraceEnabled()) {
                decisionsLog.trace(WorkflowExecutionUtils.prettyPrintDecisions(taskCompletedRequest.decisions()));
            }
            taskCompletedRequest = RequestTimeoutHelper.overrideDataPlaneRequestTimeout(taskCompletedRequest, config);

            RespondDecisionTaskCompletedRequest.Builder taskCompletedRequestBuilder = taskCompletedRequest.toBuilder();
            if (establishAffinity) {
                if (!shutdownRequested) {
                    // Override the task list to affinity task list
                    taskCompletedRequestBuilder.taskList(TaskList.builder().name(decisionTaskHandler.getAffinityHelper().getAffinityTaskList()).build());
                    taskCompletedRequestBuilder.taskListScheduleToStartTimeout(String.valueOf(config.getDeciderAffinityConfig().getAffinityTaskListScheduleToStartTimeout().getSeconds()));
                    AsyncDecider decider = handleDecisionTaskResults.getAsyncDecider();
                    if (decider.hasCompletedWithoutUnhandledDecision()) {
                        // Evict decider from cache upon completion
                        config.getDeciderAffinityConfig().getDeciderCache()
                            .remove(fromSdkType(firstTask.workflowExecution()));
                    } else {
                        // Cache the decider after clearing its history events
                        decider.getHistoryHelper().clearHistoryEvents();
                        config.getDeciderAffinityConfig().getDeciderCache()
                            .put(fromSdkType(firstTask.workflowExecution()), decider);
                    }
                } else {
                    // Clear the host specific TaskList override if shutdown is initiated to ensure newer decision tasks get scheduled on original TaskList
                    taskCompletedRequestBuilder.taskList(TaskList.builder().name(handleDecisionTaskResults.getAsyncDecider().getOriginalTaskList()).build());
                }
            }
            taskCompletedRequest = taskCompletedRequestBuilder.build();
            final RespondDecisionTaskCompletedRequest request = taskCompletedRequest;
            metrics.recordRunnable(() -> service.respondDecisionTaskCompleted(request), MetricName.Operation.RESPOND_DECISION_TASK_COMPLETED.getName(), TimeUnit.MILLISECONDS);
            decisionSubmitted = true;
            forceFetchFullHistoryIfNeeded(establishAffinity, firstTask, metrics);
        } catch (Error e) {
            if (log.isWarnEnabled()) {
                log.warn("DecisionTask failure: taskId= " + firstTask.startedEventId() + ", workflowExecution="
                    + firstTask.workflowExecution(), e);
            }
            throw e;
        } catch (Exception e) {
            if (log.isWarnEnabled()) {
                log.warn("DecisionTask failure: taskId= " + firstTask.startedEventId() + ", workflowExecution="
                    + firstTask.workflowExecution(), e);
            }
            if (log.isDebugEnabled() && firstTask.events() != null) {
                log.debug("Failed taskId=" + firstTask.startedEventId() + " history: "
                    + WorkflowExecutionUtils.prettyPrintHistory(firstTask.events(), true));
            }
            if (taskCompletedRequest != null && decisionsLog.isWarnEnabled()) {
                decisionsLog.warn("Failed taskId=" + firstTask.startedEventId() + " decisions="
                    + WorkflowExecutionUtils.prettyPrintDecisions(taskCompletedRequest.decisions()));
            }

            if (e instanceof SdkClientException && BROKEN_PIPE_ERROR_PREDICATE.test((SdkClientException) e)) {
                log.error("Unable to submit Decisions because request may have exceeded allowed maximum request size");
                metrics.recordCount(MetricName.REQUEST_SIZE_MAY_BE_EXCEEDED.getName(), 1);
            }

            throw e;
        } finally {
            if (establishAffinity && !decisionSubmitted) {
                config.getDeciderAffinityConfig().getDeciderCache()
                    .remove(fromSdkType(firstTask.workflowExecution()));
            }
            if (taskCompletedRequest != null && taskCompletedRequest.decisions() != null) {
                ThreadLocalMetrics.getMetrics().recordCount(MetricName.DECISION_COUNT.getName(), taskCompletedRequest.decisions().size(), MetricName.getResultDimension(decisionSubmitted));
            }
            metrics.recordCount(MetricName.DROPPED_TASK.getName(), !decisionSubmitted,
                MetricName.getWorkflowTypeDimension(fromSdkType(firstTask.workflowType())));
            metrics.close();
            ThreadLocalMetrics.clearCurrent();
        }
    }