in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java [65:126]
private boolean waitForRecordProcessors(GracefulShutdownContext context) {
//
// Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
// There is the possibility of a race condition where a lease is terminated after the shutdown request
// notification is started, but before the ShardConsumer is sent the notification. In this case the
// ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
//
try {
while (!context.notificationCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingLogMessage(context));
if (workerShutdownWithRemaining(context.shutdownCompleteLatch().getCount(), context)) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for notification complete, terminating shutdown. {}",
awaitingLogMessage(context));
return false;
}
if (Thread.interrupted()) {
log.warn("Interrupted before worker shutdown, terminating shutdown");
return false;
}
//
// Once all record processors have been notified of the shutdown it is safe to allow the worker to
// start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
//
context.scheduler().shutdown();
if (Thread.interrupted()) {
log.warn("Interrupted after worker shutdown, terminating shutdown");
return false;
}
//
// Want to wait for all the remaining ShardConsumers/ShardRecordProcessor's to complete their final shutdown
// processing. This should really be a no-op since as part of the notification completion the lease for
// ShardConsumer is terminated.
//
try {
while (!context.shutdownCompleteLatch().await(1, TimeUnit.SECONDS)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
log.info(awaitingFinalShutdownMessage(context));
if (workerShutdownWithRemaining(context.shutdownCompleteLatch().getCount(), context)) {
return false;
}
}
} catch (InterruptedException ie) {
log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. {}",
awaitingFinalShutdownMessage(context));
return false;
}
return true;
}