in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/CongestionController.java [270:348]
public ListenableFuture<Void> executeAll() {
logger.debug("pki{} Executing batching", partitionKeyRangeId);
ListenableFuture<Void> completionFuture = executor.submit(congestionControlTask());
Iterator<Callable<InsertMetrics>> batchExecutionIterator = batchInserter.miniBatchInsertExecutionCallableIterator();
List<ListenableFuture<InsertMetrics>> futureList = new ArrayList<>();
while(batchExecutionIterator.hasNext() && isRunning()) {
Callable<InsertMetrics> task = batchExecutionIterator.next();
// Main thread waits on the throttleSem so no more than MaxDegreeOfParallelism Tasks are run at a time.
try {
logger.debug("pki {} trying to accequire semaphore to execute a task. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
this.throttleSemaphore.acquire();
logger.debug("pki {} accquiring semaphore for executing a task succeeded. available permits {}", partitionKeyRangeId, this.throttleSemaphore.availablePermits());
} catch (InterruptedException e) {
logger.error("pki {} Interrupted, releasing semaphore", partitionKeyRangeId, e);
this.throttleSemaphore.release();
throw new RuntimeException(e);
}
if (failed()) {
logger.error("pki {} already failed due to earlier failures. not submitting new tasks", partitionKeyRangeId);
// release the already acquired semaphore
this.throttleSemaphore.release();
break;
}
ListenableFuture<InsertMetrics> insertMetricsFuture = executor.submit(task);
FutureCallback<InsertMetrics> aggregateMetricsReleaseSemaphoreCallback = new FutureCallback<InsertMetrics>() {
@Override
public void onSuccess(InsertMetrics result) {
logger.debug("pki {} accquiring a synchronized lock to update metrics", partitionKeyRangeId);
synchronized (aggregateLock) {
aggregatedInsertMetrics = InsertMetrics.sum(aggregatedInsertMetrics, result);
}
logger.debug("pki {} releasing semaphore on completion of task", partitionKeyRangeId);
throttleSemaphore.release();
}
@Override
public void onFailure(Throwable t) {
logger.error("pki {} encountered failure {} releasing semaphore", partitionKeyRangeId, t);
// if a batch inserter encounters failure which cannot be retried then we have to stop.
setState(State.Failure);
addFailure(ExceptionUtils.toException(t));
throttleSemaphore.release();
}
};
Futures.addCallback(insertMetricsFuture, aggregateMetricsReleaseSemaphoreCallback , MoreExecutors.directExecutor());
futureList.add(insertMetricsFuture);
}
ListenableFuture<List<InsertMetrics>> allFutureResults = Futures.allAsList(futureList);
FutureCallback<List<InsertMetrics>> completionCallback = new FutureCallback<List<InsertMetrics>>() {
@Override
public void onSuccess(List<InsertMetrics> result) {
logger.debug("pki {} importing completed", partitionKeyRangeId);
setState(State.Completed);
}
@Override
public void onFailure(Throwable t) {
logger.error("pki {} importing failed", partitionKeyRangeId, t);
setState(State.Failure);
}
};
Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor());
return completionFuture;
}