private Stream executeAsync()

in elastic-db-tools/src/main/java/com/microsoft/azure/elasticdb/query/multishard/MultiShardStatement.java [624:676]


    private Stream<LabeledResultSet> executeAsync(int numberOfThreads,
            Stream<Callable<LabeledResultSet>> callables,
            MultiShardExecutionPolicy executionPolicy) throws SQLException, MultiShardException {
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        try {
            // CompletionService allows to terminate the parallel execution if one of the treads throws
            // an exception
            CompletionService<LabeledResultSet> completionService = new ExecutorCompletionService<>(executorService);

            List<Future<LabeledResultSet>> futures = callables.map(completionService::submit).collect(Collectors.toList());

            // Looping over the futures in order of completion: the first future to
            // complete (or fail) is returned first by .take()
            List<LabeledResultSet> resultSets = new ArrayList<>();
            for (int i = 0; i < futures.size(); ++i) {
                try {
                    this.currentTask = completionService.take();
                    resultSets.add(this.currentTask.get());
                }
                catch (Exception e) {
                    if (e.getCause() instanceof MultiShardException) {
                        MultiShardException ex = (MultiShardException) e.getCause();
                        ShardLocation loc = ex.getShardLocation();
                        if (this.currentTask.isCancelled()) {
                            log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Cancelled;");

                            // Raise the shardExecutionCanceled event.
                            this.onShardExecutionCanceled(loc);
                        }
                        else {
                            log.info("MultiShardStatement.GetLabeledDbDataReaderTask; Command Failed");

                            // Raise the shardExecutionFaulted event.
                            this.onShardExecutionFaulted(loc, (Exception) e.getCause());
                        }

                        if (executionPolicy.equals(MultiShardExecutionPolicy.CompleteResults)) {
                            // In case one callable fails, cancel all pending and executing operations.
                            futures.forEach(f -> f.cancel(true));
                            throw ex;
                        }
                        resultSets.add(new LabeledResultSet(ex, loc, getConnectionForLocation(loc).prepareStatement(this.commandText)));
                    }
                }
            }

            return resultSets.stream();
        }
        finally {
            executorService.shutdown();
        }
    }