in src/com/amazon/kinesis/streaming/agent/Agent.java [272:347]
protected void shutDown() throws Exception {
logger.info("{}: Shutting down...", serviceName());
Stopwatch shutdownTimer = Stopwatch.createStarted();
try {
synchronized (lock) {
// Stopping all tailers
for (FileFlow<?> flow : agentContext.flows()) {
flow.getTailer().stopAsync();
}
metricsEmitter.stopAsync();
heartbeat.stopAsync();
sendingExecutor.shutdown(); // no more tasks are accepted, but current tasks will try to finish
// Waiting for them to finish up... this should take less than agentContext.shutdownTimeMillis() (see AsyncPublisher.getShutdownTimeMillis())
for (FileFlow<?> flow : agentContext.flows()) {
try {
flow.getTailer().awaitTerminated();
} catch (RuntimeException e) {
logger.warn("{}: Error while waiting for tailer {} to stop during shutdown.",
serviceName(), flow.getTailer(), e);
}
}
}
// Shutdown executor
try {
List<Runnable> tasks = sendingExecutor.shutdownNow();
logger.debug("{}: There were {} tasks that were not started due to shutdown.",
serviceName(), tasks.size());
long remaining = agentContext.shutdownTimeoutMillis() - shutdownTimer.elapsed(TimeUnit.MILLISECONDS);
if (remaining > 0) {
logger.debug("{}: Waiting up to {} ms for any executing tasks to finish.",
serviceName(), remaining);
try {
if (!sendingExecutor.awaitTermination(remaining, TimeUnit.MILLISECONDS)) {
logger.info("{}: Not all executing send tasks finished cleanly by shutdown.",
serviceName());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("{}: Interrupted while waiting for executor to shutdown.", serviceName());
}
}
} catch (Exception e) {
logger.warn("{}: Error while waiting for executor service to stop during shutdown.",
serviceName(), e);
}
// Shutdown heartbeats
try {
heartbeat.awaitTerminated();
} catch (Exception e) {
logger.warn("{}: Error while waiting for heartbeat service to stop during shutdown.",
serviceName(), e);
}
} catch (Exception e) {
String msg = String.format("%s: Unhandled exception during shutdown.", serviceName());
try { // We don't know if logging is still working
logger.error(msg, e);
} finally {
System.err.println(msg);
e.printStackTrace();
}
} finally {
uptime.stop();
// Cleanly close the checkpoint store
checkpoints.close();
// Print final message
String msg = String.format("%s: Shut down completed in %d ms. Uptime: %d ms",
serviceName(), shutdownTimer.elapsed(TimeUnit.MILLISECONDS),
uptime.elapsed(TimeUnit.MILLISECONDS));
try { // We don't know if logging is still working
logger.info(msg);
} catch (Exception e) {
System.err.println(msg);
e.printStackTrace();
}
}
}