private void readThroughput()

in azure-documentdb-benchmark/src/main/java/com/microsoft/azure/documentdb/benchmark/SimpleTests.java [319:400]


    private void readThroughput() throws InterruptedException {
        ConnectionPolicy connectionPolicy = new ConnectionPolicy();
        connectionPolicy.setConnectionMode(ConnectionMode.valueOf(connectionMode));
        if (connectionPoolSize > 0) {
            connectionPolicy.setMaxPoolSize(connectionPoolSize);
        }
        if (requestTimeout > 0) {
            connectionPolicy.setRequestTimeout(requestTimeout);
        }
        if (idleConnectionTimeout > 0) {
            connectionPolicy.setIdleConnectionTimeout(idleConnectionTimeout);
        }
        connectionPolicy.setEnableEndpointDiscovery(false);
        List<String> regions = Arrays.asList(preferredRegions.split(";"));
        connectionPolicy.setPreferredLocations(regions);
        client = new DocumentClient(endpoint, key, connectionPolicy, ConsistencyLevel.valueOf(consistencyLevel));

        List<String> documentIds = new ArrayList<>();

        if (!StringUtils.isEmpty(docIdFilePath)) {
            try {
                File file = new File(docIdFilePath);
                FileReader fileReader = new FileReader(file);
                BufferedReader bufferedReader = new BufferedReader(fileReader);
                String line;
                while ((line = bufferedReader.readLine()) != null) {
                    documentIds.add(line);
                }
                fileReader.close();
            } catch (Exception e) {
                logger.error("Failed to read document IDs list at {}", docIdFilePath, e);
            }
        } else if (!StringUtils.isEmpty(docId)) {
            documentIds.add(docId);
        }

        if (documentIds.size() == 0) {
            logger.error("Cannot continue with empty Document IDs list.");
            return;
        }

        Thread[] threads = new Thread[threadCount];

        for (int i = 0; i < threadCount; ++i) {
            final int index = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    LatencyLogger latencyLogger = new LatencyLogger(!StringUtils.isEmpty(logLatencyPath),
                            String.format("%s/%d-thread%03d.log", logLatencyPath, runId, index),
                            warmupRequestCount,
                            logBatchEntryCount,
                            printLatency);
                    long remainingOperations = totalOperations;

                    while (remainingOperations-- > 0) {
                        try {
                            String docId = documentIds.get(index % documentIds.size());
                            RequestOptions options = new RequestOptions();
                            options.setPartitionKey(new PartitionKey(docId));
                            String documentLink = String.format("dbs/%s/colls/%s/docs/%s", dbName, collectionName, docId);
                            latencyLogger.requestStart();
                            Document readDocument = client.readDocument(documentLink, options).getResource();
                            latencyLogger.requestEnd();
                            successMeter.mark();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                            failureMeter.mark();
                        }
                    }
                    latencyLogger.writeToLogFile();
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) {
            if (t != null) {
                t.join();
            }
        }
    }