in elastic-db-tools/src/main/java/com/microsoft/azure/elasticdb/query/multishard/MultiShardStatement.java [624:676]
private Stream<LabeledResultSet> executeAsync(int numberOfThreads,
Stream<Callable<LabeledResultSet>> callables,
MultiShardExecutionPolicy executionPolicy) throws SQLException, MultiShardException {
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
try {
// CompletionService allows to terminate the parallel execution if one of the treads throws
// an exception
CompletionService<LabeledResultSet> completionService = new ExecutorCompletionService<>(executorService);
List<Future<LabeledResultSet>> futures = callables.map(completionService::submit).collect(Collectors.toList());
// Looping over the futures in order of completion: the first future to
// complete (or fail) is returned first by .take()
List<LabeledResultSet> resultSets = new ArrayList<>();
for (int i = 0; i < futures.size(); ++i) {
try {
this.currentTask = completionService.take();
resultSets.add(this.currentTask.get());
}
catch (Exception e) {
if (e.getCause() instanceof MultiShardException) {
MultiShardException ex = (MultiShardException) e.getCause();
ShardLocation loc = ex.getShardLocation();
if (this.currentTask.isCancelled()) {
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Cancelled;");
// Raise the shardExecutionCanceled event.
this.onShardExecutionCanceled(loc);
}
else {
log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Failed");
// Raise the shardExecutionFaulted event.
this.onShardExecutionFaulted(loc, (Exception) e.getCause());
}
if (executionPolicy.equals(MultiShardExecutionPolicy.CompleteResults)) {
// In case one callable fails, cancel all pending and executing operations.
futures.forEach(f -> f.cancel(true));
throw ex;
}
resultSets.add(new LabeledResultSet(ex, loc, getConnectionForLocation(loc).prepareStatement(this.commandText)));
}
}
}
return resultSets.stream();
}
finally {
executorService.shutdown();
}
}