protected void shutDown()

in src/com/amazon/kinesis/streaming/agent/Agent.java [272:347]


    protected void shutDown() throws Exception {
        logger.info("{}: Shutting down...", serviceName());
        Stopwatch shutdownTimer = Stopwatch.createStarted();
        try {
            synchronized (lock) {
                // Stopping all tailers
                for (FileFlow<?> flow : agentContext.flows()) {
                    flow.getTailer().stopAsync();
                }
                metricsEmitter.stopAsync();
                heartbeat.stopAsync();
                sendingExecutor.shutdown(); // no more tasks are accepted, but current tasks will try to finish
                // Waiting for them to finish up... this should take less than agentContext.shutdownTimeMillis() (see AsyncPublisher.getShutdownTimeMillis())
                for (FileFlow<?> flow : agentContext.flows()) {
                    try {
                        flow.getTailer().awaitTerminated();
                    } catch (RuntimeException e) {
                        logger.warn("{}: Error while waiting for tailer {} to stop during shutdown.",
                                serviceName(), flow.getTailer(), e);
                    }
                }
            }
            // Shutdown executor
            try {
                List<Runnable> tasks = sendingExecutor.shutdownNow();
                logger.debug("{}: There were {} tasks that were not started due to shutdown.",
                        serviceName(), tasks.size());
                long remaining = agentContext.shutdownTimeoutMillis() - shutdownTimer.elapsed(TimeUnit.MILLISECONDS);
                if (remaining > 0) {
                    logger.debug("{}: Waiting up to {} ms for any executing tasks to finish.",
                            serviceName(), remaining);
                    try {
                        if (!sendingExecutor.awaitTermination(remaining, TimeUnit.MILLISECONDS)) {
                            logger.info("{}: Not all executing send tasks finished cleanly by shutdown.",
                                    serviceName());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.debug("{}: Interrupted while waiting for executor to shutdown.", serviceName());
                    }
                }
            } catch (Exception e) {
                logger.warn("{}: Error while waiting for executor service to stop during shutdown.",
                        serviceName(), e);
            }
            // Shutdown heartbeats
            try {
                heartbeat.awaitTerminated();
            } catch (Exception e) {
                logger.warn("{}: Error while waiting for heartbeat service to stop during shutdown.",
                        serviceName(), e);
            }
        } catch (Exception e) {
            String msg = String.format("%s: Unhandled exception during shutdown.", serviceName());
            try {   // We don't know if logging is still working
                logger.error(msg, e);
            } finally {
                System.err.println(msg);
                e.printStackTrace();
            }
        } finally {
            uptime.stop();
            // Cleanly close the checkpoint store
            checkpoints.close();
            // Print final message
            String msg = String.format("%s: Shut down completed in %d ms. Uptime: %d ms",
                    serviceName(), shutdownTimer.elapsed(TimeUnit.MILLISECONDS),
                    uptime.elapsed(TimeUnit.MILLISECONDS));
            try {   // We don't know if logging is still working
                logger.info(msg);
            } catch (Exception e) {
                System.err.println(msg);
                e.printStackTrace();
            }
        }
    }