in src/main/java/com/amazonaws/services/simpleworkflow/flow/worker/GenericWorker.java [82:141]
public void run() {
try {
while(true) {
log.debug("poll task begin");
if (pollingExecutor.isShutdown() || workerExecutor.isShutdown()) {
return;
}
pollBackoffThrottler.throttle();
final int availableWorkers = workerExecutor.getMaximumPoolSize() - workerExecutor.getActiveCount();
if (availableWorkers < 1) {
log.warn("Maximum worker thread capacity reached. Polling will not resume until an existing task completes. Consider increasing the worker thread count.");
pollBackoffThrottler.failure();
return;
}
if (pollingExecutor.isShutdown() || workerExecutor.isShutdown()) {
return;
}
if (pollRateThrottler != null) {
pollRateThrottler.throttle();
}
if (pollingExecutor.isShutdown() || workerExecutor.isShutdown()) {
return;
}
boolean semaphoreNeedsRelease = false;
SuspendableSemaphore pollingSemaphore = poller.getPollingSemaphore();
try {
if (pollingSemaphore != null) {
pollingSemaphore.acquire();
}
semaphoreNeedsRelease = true;
T task = poller.poll();
if (task == null) {
log.debug("no work returned");
return;
}
semaphoreNeedsRelease = false;
try {
workerExecutor.execute(new ExecuteTask(poller, task, pollingSemaphore));
log.debug("poll task end");
} catch (Exception | Error e) {
semaphoreNeedsRelease = true;
throw e;
}
} finally {
if (pollingSemaphore != null && semaphoreNeedsRelease) {
pollingSemaphore.release();
}
}
}
} catch (final Throwable e) {
pollBackoffThrottler.failure();
if (e instanceof LimitExceededException) {
log.info("Received LimitExceededException for over-polling on " + taskListToPoll + " TaskList. This is a cue to reduce poll rate on " + taskListToPoll);
} else if (!(e.getCause() instanceof InterruptedException)) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
}
}
}