in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java [66:159]
public static void main(String[] args) throws Exception {
CmdLineConfiguration cfg = parseCommandLineArgs(args);
try(DocumentClient client = documentClientFrom(cfg)) {
// set retry options high for initialization
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(120);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(100);
String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId());
// this assumes database and collection already exists
// also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
int offerThroughput = getOfferThroughput(client, collection);
Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client,
cfg.getDatabaseId(), cfg.getCollectionId(), collection.getPartitionKey(),
offerThroughput);
// instantiates bulk importer
try(DocumentBulkImporter bulkImporter = bulkImporterBuilder.build()) {
// then set retries to 0 to pass control to bulk importer
client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
Stopwatch fromStartToEnd = Stopwatch.createStarted();
Stopwatch totalWatch = Stopwatch.createUnstarted();
double totalRequestCharge = 0;
long totalTimeInMillis = 0;
long totalNumberOfDocumentsImported = 0;
for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {
BulkImportResponse bulkImportResponse;
Collection<String> documents = DataMigrationDocumentSource.loadDocuments(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());
if (documents.size() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
throw new RuntimeException("not enough documents generated");
}
// NOTE: only sum the bulk import time,
// loading/generating documents is out of the scope of bulk importer and so has to be excluded
totalWatch.start();
bulkImportResponse = bulkImporter.importAll(documents, false);
totalWatch.stop();
System.out.println("##########################################################################################");
totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();
// print stats
System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());
System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
System.out.println("##########################################################################################");
// check the number of imported documents to ensure everything is successfully imported
// bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
System.err.println("Some documents failed to get inserted in this checkpoint. This checkpoint has to get retried with upsert enabled");
System.err.println("Number of surfaced failures: " + bulkImportResponse.getErrors().size());
for(int j = 0; j < bulkImportResponse.getErrors().size(); j++) {
bulkImportResponse.getErrors().get(j).printStackTrace();
}
break;
}
}
fromStartToEnd.stop();
// print average stats
System.out.println("##########################################################################################");
System.out.println("Total import time including data generation: " + fromStartToEnd.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by stopWatch: " + totalWatch.elapsed().toMillis());
System.out.println("Total import time in milli seconds measured by api : " + totalTimeInMillis);
System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
System.out.println("Total request unit consumed: " + totalRequestCharge);
System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001));
System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001));
} // close bulk importer
} // closes client
}