in flux/src/main/java/software/amazon/aws/clients/swf/flux/FluxCapacitorImpl.java [475:527]
private void initializePollers() {
String hostname;
try {
hostname = shortenHostnameForIdentity(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException("Unable to determine hostname", e);
}
hostname = config.getHostnameTransformerForPollerIdentity().apply(hostname);
decisionTaskPollerThreadsPerTaskList = new HashMap<>();
deciderThreadsPerTaskList = new HashMap<>();
activityTaskPollerThreadsPerTaskList = new HashMap<>();
workerThreadsPerTaskList = new HashMap<>();
final double exponentialBackoffCoefficient = (config.getExponentialBackoffBase() != null
? config.getExponentialBackoffBase()
: DEFAULT_EXPONENTIAL_BACKOFF_BASE);
Set<String> baseTaskLists = workflowsByName.values().stream().map(Workflow::taskList).collect(Collectors.toSet());
Set<String> bucketedTaskLists = new HashSet<>(baseTaskLists);
for (String taskList : baseTaskLists) {
// We start at 2 here since we already added the base task list names to the set,
// and bucket 1 is the base name with no suffix. See synthesizeBucketedTaskListName.
for (int i = 2; i <= config.getTaskListConfig(taskList).getBucketCount(); i++) {
bucketedTaskLists.add(synthesizeBucketedTaskListName(taskList, i));
}
}
for (String taskList : bucketedTaskLists) {
int poolSize = config.getTaskListConfig(taskList).getDecisionTaskThreadCount();
String poolName = String.format("%s-%s", "decider", taskList);
deciderThreadsPerTaskList.put(taskList, new BlockOnSubmissionThreadPoolExecutor(poolSize, poolName));
poolSize = config.getTaskListConfig(taskList).getDecisionTaskPollerThreadCount();
ScheduledExecutorService service = createExecutorService(taskList, hostname, "decisionPoller", poolSize,
deciderName -> new DecisionTaskPoller(metricsFactory, swf, workflowDomain, taskList, deciderName,
exponentialBackoffCoefficient, workflowsByName,
activitiesByName, deciderThreadsPerTaskList.get(taskList)));
decisionTaskPollerThreadsPerTaskList.put(taskList, service);
poolSize = config.getTaskListConfig(taskList).getActivityTaskThreadCount();
poolName = String.format("%s-%s", "activityWorker", taskList);
workerThreadsPerTaskList.put(taskList, new BlockOnSubmissionThreadPoolExecutor(poolSize, poolName));
poolSize = config.getTaskListConfig(taskList).getActivityTaskPollerThreadCount();
service = createExecutorService(taskList, hostname, "activityPoller", poolSize,
workerName -> new ActivityTaskPoller(metricsFactory, swf, workflowDomain, taskList, workerName,
workflowsByName, activitiesByName,
workerThreadsPerTaskList.get(taskList)));
activityTaskPollerThreadsPerTaskList.put(taskList, service);
}
}