private void initializePollers()

in flux/src/main/java/software/amazon/aws/clients/swf/flux/FluxCapacitorImpl.java [475:527]


    private void initializePollers() {
        String hostname;
        try {
            hostname = shortenHostnameForIdentity(InetAddress.getLocalHost().getHostName());
        } catch (UnknownHostException e) {
            throw new RuntimeException("Unable to determine hostname", e);
        }

        hostname = config.getHostnameTransformerForPollerIdentity().apply(hostname);

        decisionTaskPollerThreadsPerTaskList = new HashMap<>();
        deciderThreadsPerTaskList = new HashMap<>();
        activityTaskPollerThreadsPerTaskList = new HashMap<>();
        workerThreadsPerTaskList = new HashMap<>();

        final double exponentialBackoffCoefficient = (config.getExponentialBackoffBase() != null
                                                        ? config.getExponentialBackoffBase()
                                                        : DEFAULT_EXPONENTIAL_BACKOFF_BASE);

        Set<String> baseTaskLists = workflowsByName.values().stream().map(Workflow::taskList).collect(Collectors.toSet());
        Set<String> bucketedTaskLists = new HashSet<>(baseTaskLists);
        for (String taskList : baseTaskLists) {
            // We start at 2 here since we already added the base task list names to the set,
            // and bucket 1 is the base name with no suffix. See synthesizeBucketedTaskListName.
            for (int i = 2; i <= config.getTaskListConfig(taskList).getBucketCount(); i++) {
                bucketedTaskLists.add(synthesizeBucketedTaskListName(taskList, i));
            }
        }

        for (String taskList : bucketedTaskLists) {
            int poolSize = config.getTaskListConfig(taskList).getDecisionTaskThreadCount();
            String poolName = String.format("%s-%s", "decider", taskList);
            deciderThreadsPerTaskList.put(taskList, new BlockOnSubmissionThreadPoolExecutor(poolSize, poolName));

            poolSize = config.getTaskListConfig(taskList).getDecisionTaskPollerThreadCount();
            ScheduledExecutorService service = createExecutorService(taskList, hostname, "decisionPoller", poolSize,
                deciderName -> new DecisionTaskPoller(metricsFactory, swf, workflowDomain, taskList, deciderName,
                                                      exponentialBackoffCoefficient, workflowsByName,
                                                      activitiesByName, deciderThreadsPerTaskList.get(taskList)));
            decisionTaskPollerThreadsPerTaskList.put(taskList, service);

            poolSize = config.getTaskListConfig(taskList).getActivityTaskThreadCount();
            poolName = String.format("%s-%s", "activityWorker", taskList);
            workerThreadsPerTaskList.put(taskList, new BlockOnSubmissionThreadPoolExecutor(poolSize, poolName));

            poolSize = config.getTaskListConfig(taskList).getActivityTaskPollerThreadCount();
            service = createExecutorService(taskList, hostname, "activityPoller", poolSize,
                workerName -> new ActivityTaskPoller(metricsFactory, swf, workflowDomain, taskList, workerName,
                                                     workflowsByName, activitiesByName,
                                                     workerThreadsPerTaskList.get(taskList)));
            activityTaskPollerThreadsPerTaskList.put(taskList, service);
        }
    }