public static void main()

in bulkimport/src/main/java/com/microsoft/azure/documentdb/bulkimport/Main.java [66:159]


    public static void main(String[] args) throws Exception {

        CmdLineConfiguration cfg = parseCommandLineArgs(args);

        try(DocumentClient client = documentClientFrom(cfg)) {

            // set retry options high for initialization
            client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(120);
            client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(100);

            String collectionLink = String.format("/dbs/%s/colls/%s", cfg.getDatabaseId(), cfg.getCollectionId());
            // this assumes database and collection already exists
            // also it is a good idea to set your connection pool size to be equal to the number of partitions serving your collection.
            DocumentCollection collection = client.readCollection(collectionLink, null).getResource();

            int offerThroughput = getOfferThroughput(client, collection);
            
            Builder bulkImporterBuilder = DocumentBulkImporter.builder().from(client, 
                    cfg.getDatabaseId(), cfg.getCollectionId(), collection.getPartitionKey(),
                    offerThroughput);

            // instantiates bulk importer
            try(DocumentBulkImporter bulkImporter = bulkImporterBuilder.build()) {
                
                // then set retries to 0 to pass control to bulk importer
                client.getConnectionPolicy().getRetryOptions().setMaxRetryWaitTimeInSeconds(0);
                client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);

                Stopwatch fromStartToEnd = Stopwatch.createStarted();

                Stopwatch totalWatch = Stopwatch.createUnstarted();

                double totalRequestCharge = 0;
                long totalTimeInMillis = 0;
                long totalNumberOfDocumentsImported = 0;

                for(int i = 0 ; i < cfg.getNumberOfCheckpoints(); i++) {

                    BulkImportResponse bulkImportResponse;

                    Collection<String> documents = DataMigrationDocumentSource.loadDocuments(cfg.getNumberOfDocumentsForEachCheckpoint(), collection.getPartitionKey());

                    if (documents.size() !=  cfg.getNumberOfDocumentsForEachCheckpoint()) {
                        throw new RuntimeException("not enough documents generated");
                    }

                    // NOTE: only sum the bulk import time,
                    // loading/generating documents is out of the scope of bulk importer and so has to be excluded
                    totalWatch.start();
                    bulkImportResponse = bulkImporter.importAll(documents, false);
                    totalWatch.stop();

                    System.out.println("##########################################################################################");

                    totalNumberOfDocumentsImported += bulkImportResponse.getNumberOfDocumentsImported();
                    totalTimeInMillis += bulkImportResponse.getTotalTimeTaken().toMillis();
                    totalRequestCharge += bulkImportResponse.getTotalRequestUnitsConsumed();

                    // print stats
                    System.out.println("Number of documents inserted in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported());
                    System.out.println("Import time for this checkpoint in milli seconds " + bulkImportResponse.getTotalTimeTaken().toMillis());
                    System.out.println("Total request unit consumed in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed());

                    System.out.println("Average RUs/second in this checkpoint: " + bulkImportResponse.getTotalRequestUnitsConsumed() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
                    System.out.println("Average #Inserts/second in this checkpoint: " + bulkImportResponse.getNumberOfDocumentsImported() / (0.001 * bulkImportResponse.getTotalTimeTaken().toMillis()));
                    System.out.println("##########################################################################################");

                    // check the number of imported documents to ensure everything is successfully imported
                    // bulkImportResponse.getNumberOfDocumentsImported() == documents.size()
                    if (bulkImportResponse.getNumberOfDocumentsImported() != cfg.getNumberOfDocumentsForEachCheckpoint()) {
                        System.err.println("Some documents failed to get inserted in this checkpoint. This checkpoint has to get retried with upsert enabled");
                        System.err.println("Number of surfaced failures: " + bulkImportResponse.getErrors().size());
                        for(int j = 0; j < bulkImportResponse.getErrors().size(); j++) {
                            bulkImportResponse.getErrors().get(j).printStackTrace();
                        }
                        break;
                    }
                }

                fromStartToEnd.stop();

                // print average stats
                System.out.println("##########################################################################################");
                System.out.println("Total import time including data generation: " + fromStartToEnd.elapsed().toMillis());
                System.out.println("Total import time in milli seconds measured by stopWatch: " + totalWatch.elapsed().toMillis());
                System.out.println("Total import time in milli seconds measured by api : " + totalTimeInMillis);
                System.out.println("Total Number of documents inserted " + totalNumberOfDocumentsImported);
                System.out.println("Total request unit consumed: " + totalRequestCharge);
                System.out.println("Average RUs/second:" + totalRequestCharge / (totalWatch.elapsed().toMillis() * 0.001));
                System.out.println("Average #Inserts/second: " + totalNumberOfDocumentsImported / (totalWatch.elapsed().toMillis() * 0.001));

            } // close bulk importer
        } // closes client
    }