private boolean waitForRecordProcessors()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/GracefulShutdownCoordinator.java [65:126]


        private boolean waitForRecordProcessors(GracefulShutdownContext context) {

            //
            // Awaiting for all ShardConsumer/RecordProcessors to be notified that a shutdown has been requested.
            // There is the possibility of a race condition where a lease is terminated after the shutdown request
            // notification is started, but before the ShardConsumer is sent the notification. In this case the
            // ShardConsumer would start the lease loss shutdown, and may never call the notification methods.
            //
            try {
                while (!context.notificationCompleteLatch().await(1, TimeUnit.SECONDS)) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    log.info(awaitingLogMessage(context));
                    if (workerShutdownWithRemaining(context.shutdownCompleteLatch().getCount(), context)) {
                        return false;
                    }
                }
            } catch (InterruptedException ie) {
                log.warn("Interrupted while waiting for notification complete, terminating shutdown. {}",
                        awaitingLogMessage(context));
                return false;
            }

            if (Thread.interrupted()) {
                log.warn("Interrupted before worker shutdown, terminating shutdown");
                return false;
            }

            //
            // Once all record processors have been notified of the shutdown it is safe to allow the worker to
            // start its shutdown behavior. Once shutdown starts it will stop renewer, and drop any remaining leases.
            //
            context.scheduler().shutdown();

            if (Thread.interrupted()) {
                log.warn("Interrupted after worker shutdown, terminating shutdown");
                return false;
            }

            //
            // Want to wait for all the remaining ShardConsumers/ShardRecordProcessor's to complete their final shutdown
            // processing. This should really be a no-op since as part of the notification completion the lease for
            // ShardConsumer is terminated.
            //
            try {
                while (!context.shutdownCompleteLatch().await(1, TimeUnit.SECONDS)) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    log.info(awaitingFinalShutdownMessage(context));
                    if (workerShutdownWithRemaining(context.shutdownCompleteLatch().getCount(), context)) {
                        return false;
                    }
                }
            } catch (InterruptedException ie) {
                log.warn("Interrupted while waiting for shutdown completion, terminating shutdown. {}",
                        awaitingFinalShutdownMessage(context));
                return false;
            }
            return true;
        }