in core/src/main/java/org/jclouds/concurrent/FutureIterables.java [106:166]
public static <F> Map<F, Exception> awaitCompletion(Map<F, ? extends ListenableFuture<?>> responses,
ListeningExecutorService exec, @Nullable Long maxTime, final Logger logger, final String logPrefix)
throws TimeoutException {
final ConcurrentMap<F, Exception> errorMap = newConcurrentMap();
if (responses.isEmpty())
return errorMap;
final int total = responses.size();
final CountDownLatch doneSignal = new CountDownLatch(total);
final AtomicInteger complete = new AtomicInteger(0);
final AtomicInteger errors = new AtomicInteger(0);
final long start = System.currentTimeMillis();
for (final Entry<F, ? extends ListenableFuture<?>> future : responses.entrySet()) {
future.getValue().addListener(new Runnable() {
@Override
public void run() {
try {
future.getValue().get();
complete.incrementAndGet();
} catch (Exception e) {
errors.incrementAndGet();
logException(logger, logPrefix, total, complete.get(), errors.get(), start, e);
errorMap.put(future.getKey(), e);
} finally {
doneSignal.countDown();
}
}
@Override
public String toString() {
return "callGetOnFuture(" + future.getKey() + "," + future.getValue() + ")";
}
}, exec);
}
try {
if (maxTime != null) {
if (!doneSignal.await(maxTime, TimeUnit.MILLISECONDS)) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
TimeoutException te = new TimeoutException(message);
logger.error(te, message);
throw te;
}
} else {
doneSignal.await();
}
if (errors.get() > 0) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
RuntimeException exception = new RuntimeException(message);
logger.error(exception, message);
}
if (logger.isTraceEnabled()) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
logger.trace(message);
}
} catch (InterruptedException ie) {
String message = message(logPrefix, total, complete.get(), errors.get(), start);
logger.error(ie, message);
throw propagate(ie);
}
return errorMap;
}