private ListenableFuture executeBulkImportAsyncImpl()

in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/DocumentBulkImporter.java [442:556]


    private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<String> documents,
            boolean isUpsert) throws Exception {
        Stopwatch watch = Stopwatch.createStarted();

        BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(true, true, null, false, isUpsert);

        logger.debug("Bucketing documents ...");

        ConcurrentHashMap<String, Set<String>> documentsToImportByPartition = new ConcurrentHashMap<String, Set<String>>();
        ConcurrentHashMap<String, List<List<String>>> miniBatchesToImportByPartition = new ConcurrentHashMap<String, List<List<String>>>();

        for (String partitionKeyRangeId: partitionKeyRangeIds) {
            documentsToImportByPartition.put(partitionKeyRangeId,  ConcurrentHashMap.newKeySet(documents.size() / partitionKeyRangeIds.size()));
            miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList<List<String>>(1000));
        }

        documents.parallelStream().forEach(documentAsString -> {
            PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(documentAsString, partitionKeyDefinition);
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(partitionKeyDefinition, true);
            String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            documentsToImportByPartition.get(partitionRangeId).add(documentAsString);
        });

        logger.trace("Creating mini batches within each partition bucket");

        documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {

            String partitionRangeId = entry.getKey();

            Set<String> documentsToImportInPartition =  entry.getValue();

            Iterator<String> it = documentsToImportInPartition.iterator();
            ArrayList<String> currentMiniBatch = new ArrayList<String>(500);
            int currentMiniBatchSize = 0;

            while (it.hasNext()) {
                String currentDocument = it.next();
                int currentDocumentSize = getDocumentSizeOrThrow(currentDocument);

                if ((currentMiniBatchSize + currentDocumentSize <= maxMiniBatchSize)) {
                    // add the document to current batch
                    currentMiniBatch.add(currentDocument);
                    currentMiniBatchSize += currentDocumentSize;
                } else {
                    // this batch has reached its max size
                    miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
                    currentMiniBatch = new ArrayList<String>(500);
                    currentMiniBatch.add(currentDocument);
                    currentMiniBatchSize = currentDocumentSize;
                }
            }

            if (currentMiniBatch.size() > 0) {
                // add mini batch
                miniBatchesToImportByPartition.get(partitionRangeId).add(currentMiniBatch);
            }
        });

        logger.debug("Beginning bulk import within each partition bucket");
        Map<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
        Map<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();

        logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
        List<ListenableFuture<Void>> futures = new ArrayList<>();

        for (String partitionKeyRangeId: this.partitionKeyRangeIds) {

            BatchInserter batchInserter = new BatchInserter(
                    partitionKeyRangeId,
                    miniBatchesToImportByPartition.get(partitionKeyRangeId),
                    this.client,
                    bulkImportStoredProcLink,
                    options);
            batchInserters.put(partitionKeyRangeId, batchInserter);

            CongestionController cc = new CongestionController(listeningExecutorService,
                    collectionThroughput / partitionKeyRangeIds.size(),
                    partitionKeyRangeId,
                    batchInserter,
                    partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId));

            congestionControllers.put(partitionKeyRangeId,cc);

            // starting
            futures.add(cc.executeAllAsync());
        }

        FutureCombiner<Void> futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkImportResponse> completeAsyncCallback = new AsyncCallable<BulkImportResponse>() {

            @Override
            public ListenableFuture<BulkImportResponse> call() throws Exception {

                List<Exception> failures = new ArrayList<>();

                for(String partitionKeyRangeId: partitionKeyRangeIds) {
                    CongestionController cc = congestionControllers.get(partitionKeyRangeId);
                    failures.addAll(cc.getFailures());
                    partitionKeyRangeIdToInferredDegreeOfParallelism.put(partitionKeyRangeId, cc.getDegreeOfConcurrency());
                }

                int numberOfDocumentsImported = batchInserters.values().stream().mapToInt(b -> b.getNumberOfDocumentsImported()).sum();
                double totalRequestUnitsConsumed = batchInserters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();

                watch.stop();

                BulkImportResponse bulkImportResponse = new
                        BulkImportResponse(numberOfDocumentsImported, totalRequestUnitsConsumed, watch.elapsed(), failures);

                return Futures.immediateFuture(bulkImportResponse);
            }
        };

        return futureContainer.callAsync(completeAsyncCallback, listeningExecutorService);
    }