in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/BatchInserter.java [137:262]
public InsertMetrics call() throws Exception {
try {
logger.debug("pki {} importing mini batch started", partitionKeyRangeId);
Stopwatch stopwatch = Stopwatch.createStarted();
double requestUnitsCounsumed = 0;
int numberOfThrottles = 0;
StoredProcedureResponse response;
boolean timedOut = false;
int currentDocumentIndex = 0;
while (currentDocumentIndex < miniBatch.size() && !cancel) {
logger.debug("pki {} inside for loop, currentDocumentIndex", partitionKeyRangeId, currentDocumentIndex);
String[] docBatch = miniBatch.subList(currentDocumentIndex, miniBatch.size()).toArray(new String[0]);
boolean isThrottled = false;
Duration retryAfter = Duration.ZERO;
try {
logger.debug("pki {}, Trying to import minibatch of {} documenents", partitionKeyRangeId, docBatch.length);
if (!timedOut) {
response = client.executeStoredProcedure(bulkImportSprocLink, requestOptions, new Object[] { docBatch, storedProcOptions, null });
} else {
BulkImportStoredProcedureOptions modifiedStoredProcOptions = new BulkImportStoredProcedureOptions(
storedProcOptions.disableAutomaticIdGeneration,
storedProcOptions.softStopOnConflict,
storedProcOptions.systemCollectionId,
storedProcOptions.enableBsonSchema,
true);
response = client.executeStoredProcedure(
bulkImportSprocLink, requestOptions,
new Object[] { docBatch, modifiedStoredProcOptions, null });
}
BulkImportStoredProcedureResponse bulkImportResponse = parseFrom(response);
if (bulkImportResponse != null) {
if (bulkImportResponse.errorCode != 0) {
logger.warn("pki {} Received response error code {}", partitionKeyRangeId, bulkImportResponse.errorCode);
if (bulkImportResponse.count == 0) {
throw new RuntimeException(
String.format("Stored proc returned failure %s", bulkImportResponse.errorCode));
}
}
double requestCharge = response.getRequestCharge();
currentDocumentIndex += bulkImportResponse.count;
numberOfDocumentsImported.addAndGet(bulkImportResponse.count);
requestUnitsCounsumed += requestCharge;
totalRequestUnitsConsumed.addAndGet(requestCharge);
}
else {
logger.warn("pki {} Failed to receive response", partitionKeyRangeId);
}
} catch (DocumentClientException e) {
logger.debug("pki {} Importing minibatch failed", partitionKeyRangeId, e);
if (isThrottled(e)) {
logger.debug("pki {} Throttled on partition range id", partitionKeyRangeId);
numberOfThrottles++;
isThrottled = true;
retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds());
// will retry again
} else if (isTimedOut(e)) {
logger.debug("pki {} Request timed out", partitionKeyRangeId);
timedOut = true;
// will retry again
} else if (isGone(e)) {
// there is no value in retrying
if (isSplit(e)) {
String errorMessage = String.format("pki %s is undergoing split, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} else {
String errorMessage = String.format("pki %s is gone, please retry shortly after re-initializing BulkImporter object", partitionKeyRangeId);
logger.error(errorMessage);
throw new RuntimeException(errorMessage);
}
} else {
// there is no value in retrying
String errorMessage = String.format("pki %s failed to import mini-batch. Exception was %s. Status code was %s",
partitionKeyRangeId,
e.getMessage(),
e.getStatusCode());
logger.error(errorMessage, e);
throw new RuntimeException(e);
}
} catch (Exception e) {
String errorMessage = String.format("pki %s Failed to import mini-batch. Exception was %s", partitionKeyRangeId,
e.getMessage());
logger.error(errorMessage, e);
throw new RuntimeException(errorMessage, e);
}
if (isThrottled) {
try {
logger.debug("pki {} throttled going to sleep for {} millis ", partitionKeyRangeId, retryAfter.toMillis());
Thread.sleep(retryAfter.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
logger.debug("pki {} completed", partitionKeyRangeId);
stopwatch.stop();
InsertMetrics insertMetrics = new InsertMetrics(currentDocumentIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles);
return insertMetrics;
} catch (Exception e) {
cancel = true;
throw e;
}
}