in src/main/java/com/amazonaws/services/kinesis/scaling/auto/AutoscalingController.java [120:167]
public void startMonitors() {
// run all the configured monitors in a thread pool
try {
int i = 0;
for (AutoscalingConfiguration streamConfig : this.config) {
StreamMonitor monitor;
try {
LOG.info(String.format("AutoscalingController creating Stream Monitor for Stream %s",
streamConfig.getStreamName()));
monitor = new StreamMonitor(streamConfig);
runningMonitors.put(i, monitor);
monitorFutures.put(i, executor.submit(monitor));
i++;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
// spin through all stream monitors to see if any failed
while (true) {
for (Map.Entry<Integer, Future<?>> entry : monitorFutures.entrySet()) {
if (entry.getValue() == null) {
throw new InterruptedException("Null Monitor Future");
} else {
if (entry.getValue().isDone()) {
if (runningMonitors.get(entry.getKey()).getException() != null) {
throw new InterruptedException(
runningMonitors.get(entry.getKey()).getException().getMessage());
}
}
}
}
Thread.sleep(60000);
}
} catch (InterruptedException e) {
try {
stopAll();
// stop the executor service
LOG.error(e.getMessage(), e);
LOG.info("Terminating Thread Pool");
executor.shutdown();
} catch (Exception e1) {
LOG.error(e1.getMessage(), e1);
}
}
}