in src/com/amazon/kinesis/streaming/agent/Agent.java [55:118]
public static void main(String[] args) throws Exception {
AgentOptions opts = AgentOptions.parse(args);
String configFile = opts.getConfigFile();
AgentConfiguration config = tryReadConfigurationFile(Paths.get(opts.getConfigFile()));
final Logger logger = LoggerFactory.getLogger(Agent.class);
// Install an unhandled exception hook
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof VirtualMachineError || e instanceof LinkageError) {
// This prevents the JVM from hanging in case of an OOME and if we have a LinkageError
// we can't trust the JVM state.
dontShutdownOnExit = true;
}
String msg = "FATAL: Thread " + t.getName() + " threw an unrecoverable error. Aborting application";
try {
try { // We don't know if logging is still working
logger.error(msg, e);
} finally {
System.err.println(msg);
e.printStackTrace();
}
} finally {
System.exit(1);
}
}
});
try {
logger.info("Reading configuration from file: {}", configFile);
if (config == null) {
config = readConfigurationFile(Paths.get(opts.getConfigFile()));
}
// Read the config directory
config = readConfigurationDirectory(config);
// Initialize and start the agent
AgentContext agentContext = new AgentContext(config);
if (agentContext.flows().isEmpty()) {
throw new ConfigurationException("There are no flows configured in configuration file.");
}
final Agent agent = new Agent(agentContext);
// Make sure everything terminates cleanly when process is killed
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
if (!dontShutdownOnExit && agent.isRunning()) {
agent.stopAsync();
agent.awaitTerminated();
}
}
});
agent.startAsync();
agent.awaitRunning();
agent.awaitTerminated();
} catch (Exception e) {
logger.error("Unhandled error.", e);
System.err.println("Unhandled error.");
e.printStackTrace();
System.exit(1);
}
}