in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/DecisionTaskPoller.java [300:382]
public void execute(final DecisionTaskIterator tasks) throws Exception {
RespondDecisionTaskCompletedRequest taskCompletedRequest = null;
final Metrics metrics = metricsRegistry.newMetrics(MetricName.Operation.EXECUTE_DECISION_TASK.getName());
final PollForDecisionTaskResponse firstTask = tasks.getFirstDecisionTask();
ThreadLocalMetrics.setCurrent(metrics);
MetricHelper.recordMetrics(firstTask, metrics);
boolean decisionSubmitted = false;
boolean establishAffinity = shouldEstablishAffinity(firstTask);
try {
HandleDecisionTaskResults handleDecisionTaskResults = decisionTaskHandler.handleDecisionTask(tasks);
taskCompletedRequest = handleDecisionTaskResults.getRespondDecisionTaskCompletedRequest();
if (decisionsLog.isTraceEnabled()) {
decisionsLog.trace(WorkflowExecutionUtils.prettyPrintDecisions(taskCompletedRequest.decisions()));
}
taskCompletedRequest = RequestTimeoutHelper.overrideDataPlaneRequestTimeout(taskCompletedRequest, config);
RespondDecisionTaskCompletedRequest.Builder taskCompletedRequestBuilder = taskCompletedRequest.toBuilder();
if (establishAffinity) {
if (!shutdownRequested) {
// Override the task list to affinity task list
taskCompletedRequestBuilder.taskList(TaskList.builder().name(decisionTaskHandler.getAffinityHelper().getAffinityTaskList()).build());
taskCompletedRequestBuilder.taskListScheduleToStartTimeout(String.valueOf(config.getDeciderAffinityConfig().getAffinityTaskListScheduleToStartTimeout().getSeconds()));
AsyncDecider decider = handleDecisionTaskResults.getAsyncDecider();
if (decider.hasCompletedWithoutUnhandledDecision()) {
// Evict decider from cache upon completion
config.getDeciderAffinityConfig().getDeciderCache()
.remove(fromSdkType(firstTask.workflowExecution()));
} else {
// Cache the decider after clearing its history events
decider.getHistoryHelper().clearHistoryEvents();
config.getDeciderAffinityConfig().getDeciderCache()
.put(fromSdkType(firstTask.workflowExecution()), decider);
}
} else {
// Clear the host specific TaskList override if shutdown is initiated to ensure newer decision tasks get scheduled on original TaskList
taskCompletedRequestBuilder.taskList(TaskList.builder().name(handleDecisionTaskResults.getAsyncDecider().getOriginalTaskList()).build());
}
}
taskCompletedRequest = taskCompletedRequestBuilder.build();
final RespondDecisionTaskCompletedRequest request = taskCompletedRequest;
metrics.recordRunnable(() -> service.respondDecisionTaskCompleted(request), MetricName.Operation.RESPOND_DECISION_TASK_COMPLETED.getName(), TimeUnit.MILLISECONDS);
decisionSubmitted = true;
forceFetchFullHistoryIfNeeded(establishAffinity, firstTask, metrics);
} catch (Error e) {
if (log.isWarnEnabled()) {
log.warn("DecisionTask failure: taskId= " + firstTask.startedEventId() + ", workflowExecution="
+ firstTask.workflowExecution(), e);
}
throw e;
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("DecisionTask failure: taskId= " + firstTask.startedEventId() + ", workflowExecution="
+ firstTask.workflowExecution(), e);
}
if (log.isDebugEnabled() && firstTask.events() != null) {
log.debug("Failed taskId=" + firstTask.startedEventId() + " history: "
+ WorkflowExecutionUtils.prettyPrintHistory(firstTask.events(), true));
}
if (taskCompletedRequest != null && decisionsLog.isWarnEnabled()) {
decisionsLog.warn("Failed taskId=" + firstTask.startedEventId() + " decisions="
+ WorkflowExecutionUtils.prettyPrintDecisions(taskCompletedRequest.decisions()));
}
if (e instanceof SdkClientException && BROKEN_PIPE_ERROR_PREDICATE.test((SdkClientException) e)) {
log.error("Unable to submit Decisions because request may have exceeded allowed maximum request size");
metrics.recordCount(MetricName.REQUEST_SIZE_MAY_BE_EXCEEDED.getName(), 1);
}
throw e;
} finally {
if (establishAffinity && !decisionSubmitted) {
config.getDeciderAffinityConfig().getDeciderCache()
.remove(fromSdkType(firstTask.workflowExecution()));
}
if (taskCompletedRequest != null && taskCompletedRequest.decisions() != null) {
ThreadLocalMetrics.getMetrics().recordCount(MetricName.DECISION_COUNT.getName(), taskCompletedRequest.decisions().size(), MetricName.getResultDimension(decisionSubmitted));
}
metrics.recordCount(MetricName.DROPPED_TASK.getName(), !decisionSubmitted,
MetricName.getWorkflowTypeDimension(fromSdkType(firstTask.workflowType())));
metrics.close();
ThreadLocalMetrics.clearCurrent();
}
}