in amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java [160:204]
private boolean checkAndSubmitNextTask() {
boolean submittedNewTask = false;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
try {
TaskResult result = future.get();
if (result.getException() != null) {
log.error("Caught exception running {} task: ", currentTask.taskType(),
result.getException());
}
} catch (InterruptedException | ExecutionException e) {
log.warn("{} task encountered exception.", currentTask.taskType(), e);
}
}
currentTask =
new MetricsCollectingTaskDecorator(
new ShardSyncTask(shardDetector,
leaseRefresher,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
garbageCollectLeases,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory),
metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
log.info(new ExecutorStateEvent(executorService).message());
submittedNewTask = true;
if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType());
}
} else {
if (log.isDebugEnabled()) {
log.debug("Previous {} task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.", currentTask.taskType());
}
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
return submittedNewTask;
}