in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/AsyncDecisionTaskHandler.java [185:240]
private AsyncDecider createDecider(HistoryHelper historyHelper) throws Exception {
PollForDecisionTaskResponse decisionTask = historyHelper.getDecisionTask();
WorkflowType workflowType = fromSdkType(decisionTask.workflowType());
if (log.isDebugEnabled()) {
log.debug("WorkflowTask received: taskId=" + decisionTask.startedEventId() + ", taskToken="
+ decisionTask.taskToken() + ", workflowExecution=" + decisionTask.workflowExecution());
}
WorkflowDefinitionFactory workflowDefinitionFactory = definitionFactoryFactory.getWorkflowDefinitionFactory(workflowType);
if (workflowDefinitionFactory == null) {
ThreadLocalMetrics.getMetrics().recordCount(MetricName.TYPE_NOT_FOUND.getName(), 1, MetricName.getWorkflowTypeDimension(workflowType));
log.error("Received decision task for workflow type not configured with a worker: workflowType="
+ decisionTask.workflowType() + ", taskToken=" + decisionTask.taskToken() + ", workflowExecution="
+ decisionTask.workflowExecution());
Iterable<WorkflowType> typesToRegister = definitionFactoryFactory.getWorkflowTypesToRegister();
StringBuilder types = new StringBuilder();
types.append("[");
for (WorkflowType t : typesToRegister) {
if (types.length() > 1) {
types.append(", ");
}
types.append(t);
}
types.append("]");
throw new IncompatibleWorkflowDefinition("Workflow type \"" + workflowType
+ "\" is not supported by the WorkflowWorker. "
+ "Possible cause is workflow type version change without changing task list name. "
+ "Workflow types registered by the worker are: " + types.toString());
}
// Check if we need to look for a cached decider
if (affinityHelper != null && affinityHelper.isAffinityWorker()) {
AsyncDecider decider = affinityHelper.getDeciderForDecisionTask(decisionTask);
if (decider != null) {
// Save the component versions and workflow context data of the history helper in the cached decider
historyHelper.setComponentVersions(decider.getHistoryHelper().getComponentVersions());
historyHelper.setWorkflowContextData(decider.getHistoryHelper().getWorkflowContextData());
// Skip the DecisionTaskStarted event this cached decider has seen
historyHelper.getSingleDecisionEvents();
// Update the cached decider with new history events
decider.setHistoryHelper(historyHelper);
ThreadLocalMetrics.getMetrics().recordCount(MetricName.VALID_DECIDER_FOUND_IN_CACHE.getName(), 1,
MetricName.getWorkflowTypeDimension(fromSdkType(decisionTask.workflowType())));
return decider;
}
// No valid decider is found in cache, so we need to replay using the entire history
historyHelper = affinityHelper.createHistoryHelperForDecisionTask(decisionTask);
ThreadLocalMetrics.getMetrics().recordCount(MetricName.VALID_DECIDER_FOUND_IN_CACHE.getName(), 0,
MetricName.getWorkflowTypeDimension(fromSdkType(decisionTask.workflowType())));
}
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, childWorkflowIdHandler, clientConfig);
WorkflowTypeImplementationOptions workflowImplementationOptions = workflowDefinitionFactory.getWorkflowImplementationOptions();
if (workflowImplementationOptions != null) {
List<WorkflowTypeComponentImplementationVersion> implementationComponentVersions = workflowImplementationOptions.getImplementationComponentVersions();
historyHelper.getComponentVersions().setWorkflowImplementationComponentVersions(implementationComponentVersions);
}
return new AsyncDecider(workflowDefinitionFactory, historyHelper, decisionsHelper);
}