in azure-documentdb-benchmark/src/main/java/com/microsoft/azure/documentdb/benchmark/SimpleTests.java [319:400]
private void readThroughput() throws InterruptedException {
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setConnectionMode(ConnectionMode.valueOf(connectionMode));
if (connectionPoolSize > 0) {
connectionPolicy.setMaxPoolSize(connectionPoolSize);
}
if (requestTimeout > 0) {
connectionPolicy.setRequestTimeout(requestTimeout);
}
if (idleConnectionTimeout > 0) {
connectionPolicy.setIdleConnectionTimeout(idleConnectionTimeout);
}
connectionPolicy.setEnableEndpointDiscovery(false);
List<String> regions = Arrays.asList(preferredRegions.split(";"));
connectionPolicy.setPreferredLocations(regions);
client = new DocumentClient(endpoint, key, connectionPolicy, ConsistencyLevel.valueOf(consistencyLevel));
List<String> documentIds = new ArrayList<>();
if (!StringUtils.isEmpty(docIdFilePath)) {
try {
File file = new File(docIdFilePath);
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
documentIds.add(line);
}
fileReader.close();
} catch (Exception e) {
logger.error("Failed to read document IDs list at {}", docIdFilePath, e);
}
} else if (!StringUtils.isEmpty(docId)) {
documentIds.add(docId);
}
if (documentIds.size() == 0) {
logger.error("Cannot continue with empty Document IDs list.");
return;
}
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; ++i) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
LatencyLogger latencyLogger = new LatencyLogger(!StringUtils.isEmpty(logLatencyPath),
String.format("%s/%d-thread%03d.log", logLatencyPath, runId, index),
warmupRequestCount,
logBatchEntryCount,
printLatency);
long remainingOperations = totalOperations;
while (remainingOperations-- > 0) {
try {
String docId = documentIds.get(index % documentIds.size());
RequestOptions options = new RequestOptions();
options.setPartitionKey(new PartitionKey(docId));
String documentLink = String.format("dbs/%s/colls/%s/docs/%s", dbName, collectionName, docId);
latencyLogger.requestStart();
Document readDocument = client.readDocument(documentLink, options).getResource();
latencyLogger.requestEnd();
successMeter.mark();
} catch (Exception e) {
logger.error(e.getMessage(), e);
failureMeter.mark();
}
}
latencyLogger.writeToLogFile();
}
});
threads[i].start();
}
for (Thread t : threads) {
if (t != null) {
t.join();
}
}
}