in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/GenericWorker.java [290:336]
public void start() {
if (log.isInfoEnabled()) {
log.info("start: " + this);
}
if (shutdownRequested.get()) {
throw new IllegalStateException("Shutdown Requested. Not restartable.");
}
if (!startRequested.compareAndSet(false, true)) {
return;
}
checkRequiredProperty(service, "service");
checkRequiredProperty(domain, "domain");
checkRequiredProperty(taskListToPoll, "taskListToPoll");
checkRequiredProperties();
if (registerDomain) {
registerDomain();
}
if (!disableTypeRegistrationOnStart) {
registerTypesToPoll();
}
if (maximumPollRatePerSecond > 0.0) {
pollRateThrottler = new Throttler("pollRateThrottler " + taskListToPoll, maximumPollRatePerSecond,
maximumPollRateIntervalMilliseconds);
}
poller = createPoller();
if (suspended) {
poller.suspend();
}
pollBackoffThrottler = new BackoffThrottler(pollBackoffInitialInterval, pollBackoffMaximumInterval,
pollBackoffCoefficient);
workerExecutor = new ThreadPoolExecutor(executeThreadCount, executeThreadCount, 1, TimeUnit.MINUTES,
new SynchronousQueue<>(), new BlockCallerPolicy());
workerExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
ExecutorThreadFactory pollExecutorThreadFactory = getExecutorThreadFactory("Worker");
workerExecutor.setThreadFactory(pollExecutorThreadFactory);
metricsRegistry.getExecutorServiceMonitor().monitor(workerExecutor, "com.amazonaws.services.simpleworkflow.flow:type=ThreadPoolState,name=" + getWorkerType().getName() + "Worker");
pollingExecutor = new ScheduledThreadPoolExecutor(pollThreadCount, getExecutorThreadFactory("Poller"));
metricsRegistry.getExecutorServiceMonitor().monitor(pollingExecutor, "com.amazonaws.services.simpleworkflow.flow:type=ThreadPoolState,name=" + getWorkerType().getName() + "Poller");
for (int i = 0; i < pollThreadCount; i++) {
pollingExecutor.scheduleWithFixedDelay(new PollingTask(poller), 0, 1, TimeUnit.NANOSECONDS);
}
}