in src/main/java/com/amazonaws/services/sqs/util/SQSMessageConsumer.java [92:137]
private void poll() {
try {
for (;;) {
if (Thread.interrupted() || shuttingDown.get()) {
break;
}
int waitTimeSeconds = maxWaitTimeSeconds;
if (deadlineNanos > 0) {
long currentNanos = System.nanoTime();
if (currentNanos >= deadlineNanos) {
shutdown();
break;
} else {
int secondsRemaining = (int)TimeUnit.NANOSECONDS.toSeconds(deadlineNanos - currentNanos);
waitTimeSeconds = Math.max(0, Math.min(maxWaitTimeSeconds, secondsRemaining));
}
}
try {
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(waitTimeSeconds)
.withMaxNumberOfMessages(10)
.withMessageAttributeNames(ATTRIBUTE_NAMES_ALL)
.withAttributeNames(ATTRIBUTE_NAMES_ALL);
List<Message> messages = sqs.receiveMessage(request).getMessages();
messages.parallelStream().forEach(this::handleMessage);
} catch (QueueDoesNotExistException e) {
// Ignore, it may be recreated!
// Slow down on the polling though, to avoid tight looping.
// This can be treated similar to an empty queue.
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
exceptionHandler.accept(e);
}
}
} finally {
terminated.countDown();
}
}