public InsertMetrics call()

in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java [137:262]


                public InsertMetrics call() throws Exception {

                    try {
                        logger.debug("pki {} importing mini batch started", partitionKeyRangeId);
                        Stopwatch stopwatch = Stopwatch.createStarted();
                        double requestUnitsCounsumed = 0;
                        int numberOfThrottles = 0;
                        StoredProcedureResponse response;
                        boolean timedOut = false;

                        int currentDocumentIndex = 0;

                        while (currentDocumentIndex < miniBatch.size() && !cancel) {
                            logger.debug("pki {} inside for loop, currentDocumentIndex", partitionKeyRangeId, currentDocumentIndex);

                            String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size()).toArray(new String[0]);

                            boolean isThrottled = false;
                            Duration retryAfter = Duration.ZERO;

                            try {

                                logger.debug("pki {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length);

                                if (!timedOut) {
                                    response = client.executeStoredProcedure(bulkImportSprocLink, requestOptions, new Object[] { docBatch, storedProcOptions,  null });
                                } else {
                                    BulkImportStoredProcedureOptions modifiedStoredProcOptions = new BulkImportStoredProcedureOptions(
                                            storedProcOptions.disableAutomaticIdGeneration,
                                            storedProcOptions.softStopOnConflict,
                                            storedProcOptions.systemCollectionId,
                                            storedProcOptions.enableBsonSchema,
                                            true);

                                    response = client.executeStoredProcedure(
                                            bulkImportSprocLink, requestOptions,
                                            new Object[] { docBatch, modifiedStoredProcOptions, null });
                                }

                                BulkImportStoredProcedureResponse bulkImportResponse = parseFrom(response);

                                if (bulkImportResponse != null) {
                                    if (bulkImportResponse.errorCode != 0) {
                                        logger.warn("pki {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
                                        if (bulkImportResponse.count == 0) {
                                            throw new RuntimeException(
                                                    String.format("Stored proc returned failure %s", bulkImportResponse.errorCode));
                                        }
                                    }

                                    double requestCharge = response.getRequestCharge();
                                    currentDocumentIndex += bulkImportResponse.count;
                                    numberOfDocumentsImported.addAndGet(bulkImportResponse.count);
                                    requestUnitsCounsumed += requestCharge;
                                    totalRequestUnitsConsumed.addAndGet(requestCharge);
                                }
                                else {
                                    logger.warn("pki {} Failed to receive response", partitionKeyRangeId);
                                }

                            } catch (DocumentClientException e) {

                                logger.debug("pki {} Importing minibatch failed", partitionKeyRangeId, e);

                                if (isThrottled(e)) {
                                    logger.debug("pki {} Throttled on partition range id", partitionKeyRangeId);
                                    numberOfThrottles++;
                                    isThrottled = true;
                                    retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds());
                                    // will retry again

                                } else if (isTimedOut(e)) {
                                    logger.debug("pki {} Request timed out", partitionKeyRangeId);
                                    timedOut = true;
                                    // will retry again

                                } else if (isGone(e)) {
                                    // there is no value in retrying
                                    if (isSplit(e)) {
                                        String errorMessage = String.format("pki %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
                                        logger.error(errorMessage);
                                        throw new RuntimeException(errorMessage);
                                    } else {
                                        String errorMessage = String.format("pki %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
                                        logger.error(errorMessage);
                                        throw new RuntimeException(errorMessage);
                                    }

                                } else {
                                    // there is no value in retrying
                                    String errorMessage = String.format("pki %s failed to import mini-batch. Exception was %s. Status code was %s",
                                            partitionKeyRangeId,
                                            e.getMessage(),
                                            e.getStatusCode());
                                    logger.error(errorMessage, e);
                                    throw new RuntimeException(e);
                                }

                            } catch (Exception e) {
                                String errorMessage = String.format("pki %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId,
                                        e.getMessage());
                                logger.error(errorMessage, e);
                                throw new RuntimeException(errorMessage, e);
                            }

                            if (isThrottled) {
                                try {
                                    logger.debug("pki {} throttled going to sleep for {} millis ", partitionKeyRangeId, retryAfter.toMillis());
                                    Thread.sleep(retryAfter.toMillis());
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }

                        logger.debug("pki {} completed", partitionKeyRangeId);

                        stopwatch.stop();
                        InsertMetrics insertMetrics = new InsertMetrics(currentDocumentIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles);

                        return insertMetrics;
                    } catch (Exception e) {
                        cancel = true;
                        throw e;
                    }
                }