in gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java [182:310]
public static void main(final String[] args) {
final Map<String,Object> options = ElementHelper.asMap(args);
final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString());
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
final TestType testType = TestType.values()[(Integer.parseInt(options.getOrDefault("testType", "1").toString()) % TestType.values().length)];
final String host = options.getOrDefault("host", "localhost").toString();
final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "1000").toString());
final int timeout = Integer.parseInt(options.getOrDefault("timeout", "1200000").toString());
final int warmups = Integer.parseInt(options.getOrDefault("warmups", "5").toString());
final int executions = Integer.parseInt(options.getOrDefault("executions", "10").toString());
final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "1").toString());
final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString());
final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString());
final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString());
final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", Runtime.getRuntime().availableProcessors() * 2).toString());
final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
final String serializer = options.getOrDefault("serializer", Serializers.GRAPHBINARY_V4.name()).toString();
final int pauseBetweenRuns = Integer.parseInt(options.getOrDefault("pauseBetweenRuns", "1000").toString());
final boolean suppressStackTraces = Boolean.parseBoolean(options.getOrDefault("suppressStackTraces", "false").toString());
final boolean exercise = Boolean.parseBoolean(options.getOrDefault("exercise", "false").toString());
final String script = options.getOrDefault("script", "g.inject(1)").toString();
final Cluster cluster = Cluster.build(host)
.maxConnectionPoolSize(maxConnectionPoolSize)
.nioPoolSize(nioPoolSize)
.maxWaitForConnection(maxWaitForConnection)
.serializer(Serializers.valueOf(serializer))
.workerPoolSize(workerPoolSize).create();
try {
if (TestType.LATENCY == testType) {
System.out.println("-----------------------LATENCY TEST SELECTED----------------------");
} else {
System.out.println("---------------------THROUGHPUT TEST SELECTED---------------------");
}
if (exercise) {
System.out.println("--------------------------INITIALIZATION--------------------------");
final Client client = cluster.connect();
client.submit("graph.clear()").all().join();
System.out.println("Cleared existing 'graph'");
client.submit("TinkerFactory.generateModern(graph)").all().join();
client.close();
System.out.println("Modern graph loaded");
}
if (TestType.THROUGHPUT == testType) {
final Object fileName = options.get("store");
final File f = null == fileName ? null : new File(fileName.toString());
if (f != null && f.length() == 0) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println("parallelism\tnioPoolSize\tmaxConnectionPoolSize\tworkerPoolSize\trequestPerSecond");
}
}
// not much point to continuing with a line of tests if we can't get at least minExpectedRps.
final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise, suppressStackTraces).executeThroughput();
meetsRpsExpectation.set(averageRequestsPerSecond >= minExpectedRps);
TimeUnit.MILLISECONDS.sleep(pauseBetweenRuns); // pause between executions
}
final AtomicBoolean exceededTimeout = new AtomicBoolean(false);
long totalRequestsPerSecond = 0;
// no need to execute this if we didn't pass the basic expectation in the warmups
if (exercise || meetsRpsExpectation.get()) {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise, suppressStackTraces).executeThroughput();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS.sleep(pauseBetweenRuns); // pause between executions
}
}
final int averageRequestPerSecond = !meetsRpsExpectation.get() || exceededTimeout.get() ? 0 : Math.round(totalRequestsPerSecond / executions);
System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
if (f != null) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
}
}
} else if (TestType.LATENCY == testType) {
final AtomicBoolean meetsTimeoutExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsTimeoutExpectation.get(); ix++) {
final double latency = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise, suppressStackTraces).executeLatency();
meetsTimeoutExpectation.set(latency < timeout);
TimeUnit.MILLISECONDS.sleep(pauseBetweenRuns); // pause between executions
}
final AtomicBoolean exceededTimeout = new AtomicBoolean(false);
double totalTime = 0;
// no need to execute this if we didn't pass the basic expectation in the warmups
if (exercise || meetsTimeoutExpectation.get()) {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
totalTime += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise, suppressStackTraces).executeLatency();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
TimeUnit.MILLISECONDS.sleep(pauseBetweenRuns); // pause between executions
}
}
final double averageLatency = !meetsTimeoutExpectation.get() || exceededTimeout.get() ? 0 : (totalTime / executions);
System.out.println(String.format("avg latency (sec/req): %s", averageLatency));
} else {
System.out.println("Encountered unknown testType. Please enter a valid value and try again.");
}
if (!noExit) System.exit(0);
} catch (Exception ex) {
ex.printStackTrace();
if (!noExit) System.exit(1);
} finally {
executor.shutdown();
cluster.close();
}
}