in src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java [129:199]
private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
Decider decider = null;
AtomicBoolean createdNew = new AtomicBoolean();
try {
if (stickyTaskListName == null) {
decider = createDecider(decisionTask);
} else {
decider =
cache.getOrCreate(
decisionTask,
() -> {
createdNew.set(true);
return createDecider(decisionTask);
});
}
Decider.DecisionResult result = decider.decide(decisionTask);
if (stickyTaskListName != null && createdNew.get()) {
cache.addToCache(decisionTask, decider);
}
if (log.isTraceEnabled()) {
WorkflowExecution execution = decisionTask.getWorkflowExecution();
log.trace(
"WorkflowTask startedEventId="
+ decisionTask.getStartedEventId()
+ ", WorkflowID="
+ execution.getWorkflowId()
+ ", RunID="
+ execution.getRunId()
+ " completed with "
+ WorkflowExecutionUtils.prettyPrintDecisions(result.getDecisions())
+ " forceCreateNewDecisionTask "
+ result.getForceCreateNewDecisionTask());
} else if (log.isDebugEnabled()) {
WorkflowExecution execution = decisionTask.getWorkflowExecution();
log.debug(
"WorkflowTask startedEventId="
+ decisionTask.getStartedEventId()
+ ", WorkflowID="
+ execution.getWorkflowId()
+ ", RunID="
+ execution.getRunId()
+ " completed with "
+ result.getDecisions().size()
+ " new decisions"
+ " forceCreateNewDecisionTask "
+ result.getForceCreateNewDecisionTask());
}
return createCompletedRequest(decisionTask, result);
} catch (Throwable e) {
// Note here that the decider might not be in the cache, even sticky is on. In that case we
// need to close the decider explicitly.
// For items in the cache, invalidation callback will try to close again, which should be ok.
if (decider != null) {
decider.close();
}
if (stickyTaskListName != null) {
cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
}
throw e;
} finally {
if (stickyTaskListName == null && decider != null) {
decider.close();
} else {
cache.markProcessingDone(decisionTask);
}
}
}