in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentBulkImporter.java [442:556]
private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<String> documents,
boolean isUpsert) throws Exception {
Stopwatch watch = Stopwatch.createStarted();
BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, isUpsert);
logger.debug("Bucketing documents ...");
ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
ConcurrentHashMap<String, List<List<String>>> miniBatchesToImportByPartition = new ConcurrentHashMap<String, List<List<String>>>();
for (String partitionKeyRangeId: partitionKeyRangeIds) {
documentsToImportByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet(documents.size() / partitionKeyRangeIds.size()));
miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(1000));
}
documents.parallelStream().forEach(documentAsString -> {
PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(documentAsString, partitionKeyDefinition);
String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
documentsToImportByPartition.get(partitionRangeId).add(documentAsString);
});
logger.trace("Creating mini batches within each partition bucket");
documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {
String partitionRangeId = entry.getKey();
Set<String> documentsToImportInPartition = entry.getValue();
Iterator<String> it = documentsToImportInPartition.iterator();
ArrayList<String> currentMiniBatch = new ArrayList<String>(500);
int currentMiniBatchSize = 0;
while (it.hasNext()) {
String currentDocument = it.next();
int currentDocumentSize = getDocumentSizeOrThrow(currentDocument);
if ((currentMiniBatchSize + currentDocumentSize <= maxMiniBatchSize)) {
// add the document to current batch
currentMiniBatch.add(currentDocument);
currentMiniBatchSize += currentDocumentSize;
} else {
// this batch has reached its max size
miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
currentMiniBatch = new ArrayList<String>(500);
currentMiniBatch.add(currentDocument);
currentMiniBatchSize = currentDocumentSize;
}
}
if (currentMiniBatch.size() > 0) {
// add mini batch
miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
}
});
logger.debug("Beginning bulk import within each partition bucket");
Map<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
Map<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (String partitionKeyRangeId: this.partitionKeyRangeIds) {
BatchInserter batchInserter = new BatchInserter(
partitionKeyRangeId,
miniBatchesToImportByPartition.get(partitionKeyRangeId),
this.client,
bulkImportStoredProcLink,
options);
batchInserters.put(partitionKeyRangeId, batchInserter);
CongestionController cc = new CongestionController(listeningExecutorService,
collectionThroughput / partitionKeyRangeIds.size(),
partitionKeyRangeId,
batchInserter,
partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId));
congestionControllers.put(partitionKeyRangeId,cc);
// starting
futures.add(cc.executeAllAsync());
}
FutureCombiner<Void> futureContainer = Futures.whenAllComplete(futures);
AsyncCallable<BulkImportResponse> completeAsyncCallback = new AsyncCallable<BulkImportResponse>() {
@Override
public ListenableFuture<BulkImportResponse> call() throws Exception {
List<Exception> failures = new ArrayList<>();
for(String partitionKeyRangeId: partitionKeyRangeIds) {
CongestionController cc = congestionControllers.get(partitionKeyRangeId);
failures.addAll(cc.getFailures());
partitionKeyRangeIdToInferredDegreeOfParallelism.put(partitionKeyRangeId, cc.getDegreeOfConcurrency());
}
int numberOfDocumentsImported = batchInserters.values().stream().mapToInt(b -> b.getNumberOfDocumentsImported()).sum();
double totalRequestUnitsConsumed = batchInserters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
watch.stop();
BulkImportResponse bulkImportResponse = new
BulkImportResponse(numberOfDocumentsImported, totalRequestUnitsConsumed, watch.elapsed(), failures);
return Futures.immediateFuture(bulkImportResponse);
}
};
return futureContainer.callAsync(completeAsyncCallback, listeningExecutorService);
}