public ListenableFuture executeAll()

in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java [270:348]


    public ListenableFuture<Void> executeAll()  {

        logger.debug("pki{} Executing batching", partitionKeyRangeId);

        ListenableFuture<Void> completionFuture = executor.submit(congestionControlTask());

        Iterator<Callable<InsertMetrics>> batchExecutionIterator = batchInserter.miniBatchInsertExecutionCallableIterator();

        List<ListenableFuture<InsertMetrics>> futureList = new ArrayList<>();
        while(batchExecutionIterator.hasNext() && isRunning()) {
            Callable<InsertMetrics> task = batchExecutionIterator.next();

            // Main thread waits on the throttleSem so no more than MaxDegreeOfParallelism Tasks are run at a time.
            try {
                logger.debug("pki {} trying to accequire semaphore to execute a task. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
                this.throttleSemaphore.acquire();
                logger.debug("pki {} accquiring semaphore for executing a task succeeded. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
            } catch (InterruptedException e) {
                logger.error("pki {} Interrupted, releasing semaphore", partitionKeyRangeId, e);
                this.throttleSemaphore.release();
                throw new RuntimeException(e);
            }

            if (failed()) {
                logger.error("pki {} already failed due to earlier failures. not submitting new tasks", partitionKeyRangeId);
                // release the already acquired semaphore
                this.throttleSemaphore.release();
                break;
            }

            ListenableFuture<InsertMetrics> insertMetricsFuture = executor.submit(task);

            FutureCallback<InsertMetrics> aggregateMetricsReleaseSemaphoreCallback = new FutureCallback<InsertMetrics>() {

                @Override
                public void onSuccess(InsertMetrics result) {
                    logger.debug("pki {} accquiring a synchronized lock to update metrics", partitionKeyRangeId);

                    synchronized (aggregateLock) {
                        aggregatedInsertMetrics = InsertMetrics.sum(aggregatedInsertMetrics, result);
                    }
                    logger.debug("pki {} releasing semaphore on completion of task", partitionKeyRangeId);
                    throttleSemaphore.release();
                }

                @Override
                public void onFailure(Throwable t) {
                    logger.error("pki {} encountered failure {} releasing semaphore", partitionKeyRangeId, t);
                    // if a batch inserter encounters failure which cannot be retried then we have to stop.
                    setState(State.Failure);
                    addFailure(ExceptionUtils.toException(t));
                    throttleSemaphore.release();
                }
            };

            Futures.addCallback(insertMetricsFuture, aggregateMetricsReleaseSemaphoreCallback , MoreExecutors.directExecutor());
            futureList.add(insertMetricsFuture);
        }

        ListenableFuture<List<InsertMetrics>> allFutureResults = Futures.allAsList(futureList);

        FutureCallback<List<InsertMetrics>> completionCallback = new FutureCallback<List<InsertMetrics>>() {

            @Override
            public void onSuccess(List<InsertMetrics> result) {
                logger.debug("pki {} importing completed", partitionKeyRangeId);
                setState(State.Completed);
            }

            @Override
            public void onFailure(Throwable t) {
                logger.error("pki {} importing failed", partitionKeyRangeId, t);
                setState(State.Failure);
            }
        };

        Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor());
        return completionFuture;
    }