in azure-documentdb-benchmark/src/main/java/com/microsoft/azure/documentdb/benchmark/SimpleTests.java [259:317]
private void writeThroughput() throws InterruptedException {
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
connectionPolicy.setConnectionMode(ConnectionMode.valueOf(connectionMode));
if (connectionPoolSize > 0) {
connectionPolicy.setMaxPoolSize(connectionPoolSize);
}
if (requestTimeout > 0) {
connectionPolicy.setRequestTimeout(requestTimeout);
}
if (idleConnectionTimeout > 0) {
connectionPolicy.setIdleConnectionTimeout(idleConnectionTimeout);
}
connectionPolicy.setEnableEndpointDiscovery(false);
client = new DocumentClient(endpoint, key, connectionPolicy, ConsistencyLevel.valueOf(consistencyLevel));
Thread[] threads = new Thread[threadCount];
final String collectionLink = String.format("dbs/%s/colls/%s", dbName, collectionName);
for (int i = 0; i < threadCount; ++i) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
LatencyLogger latencyLogger = new LatencyLogger(!StringUtils.isEmpty(logLatencyPath),
String.format("%s/%d-thread%03d", logLatencyPath, runId, index),
warmupRequestCount,
logBatchEntryCount,
printLatency);
long remainingOperations = totalOperations;
while (remainingOperations-- > 0) {
try {
Document newDoc = new Document();
String idString = TB_GENERATOR.generate().toString();
newDoc.setId(idString);
newDoc.set(pKey, idString);
latencyLogger.requestStart();
Document createdDoc = client.upsertDocument(collectionLink,
newDoc, null, true).getResource();
latencyLogger.requestEnd();
successMeter.mark();
} catch (Exception e) {
failureMeter.mark();
logger.error(e.getMessage(), e);
}
}
latencyLogger.writeToLogFile();
}
});
threads[i].start();
}
for (Thread t : threads) {
if (t != null) {
t.join();
}
}
}