in benchmarks/tpcc/src/main/java/com/google/cloud/pgadapter/tpcc/BenchmarkApplication.java [78:214]
public void run(String... args) throws Exception {
ProxyServer server = pgAdapterConfiguration.isInProcess() ? startPGAdapter() : null;
String pgadapterConnectionUrl =
server == null
? pgAdapterConfiguration.getConnectionUrl()
: String.format("jdbc:postgresql://localhost:%d/tpcc", server.getLocalPort());
String spannerConnectionUrl =
String.format(
"jdbc:cloudspanner:/projects/%s/instances/%s/databases/%s?numChannels=%d;minSessions=%d;maxSessions=%d"
+ (pgAdapterConfiguration.getCredentials() == null
? ""
: ";credentials=" + pgAdapterConfiguration.getCredentials()),
spannerConfiguration.getProject(),
spannerConfiguration.getInstance(),
spannerConfiguration.getDatabase(),
pgAdapterConfiguration.getNumChannels(),
Math.max(
pgAdapterConfiguration.getMinSessions(), tpccConfiguration.getBenchmarkThreads()),
Math.max(
pgAdapterConfiguration.getMaxSessions(), tpccConfiguration.getBenchmarkThreads()));
try {
if (tpccConfiguration.isLoadData()) {
boolean isClientLibGSQLRunner =
tpccConfiguration.getBenchmarkRunner().equals(TpccConfiguration.CLIENT_LIB_GSQL_RUNNER);
Dialect dialect = isClientLibGSQLRunner ? Dialect.GOOGLE_STANDARD_SQL : Dialect.POSTGRESQL;
String loadDataConnectionUrl =
isClientLibGSQLRunner ? spannerConnectionUrl : pgadapterConnectionUrl;
System.out.println("Checking schema");
SchemaService schemaService = new SchemaService(loadDataConnectionUrl, dialect);
schemaService.createSchema();
System.out.println("Checked schema, starting benchmark");
LOG.info("Starting data load");
ExecutorService executor = Executors.newSingleThreadExecutor();
DataLoadStatus status = new DataLoadStatus(tpccConfiguration);
Future<Long> loadDataFuture =
executor.submit(() -> loadData(status, loadDataConnectionUrl));
executor.shutdown();
Stopwatch watch = Stopwatch.createStarted();
while (!loadDataFuture.isDone()) {
//noinspection BusyWait
Thread.sleep(1_000L);
status.print(watch.elapsed());
}
System.out.printf("Finished loading %d rows\n", loadDataFuture.get());
}
if (!tpccConfiguration.isRunBenchmark()) {
return;
}
if (TpccConfiguration.RUNNERS.contains(tpccConfiguration.getBenchmarkRunner())) {
LOG.info("Starting benchmark");
// Enable the OpenTelemetry metrics in the client library.
OpenTelemetry openTelemetry = enableOpenTelemetryMetrics();
Metrics metrics = new Metrics(openTelemetry, createMetricAttributes(spannerConfiguration));
Statistics statistics = new Statistics(tpccConfiguration);
ExecutorService executor =
Executors.newFixedThreadPool(tpccConfiguration.getBenchmarkThreads());
for (int i = 0; i < tpccConfiguration.getBenchmarkThreads(); i++) {
if (tpccConfiguration
.getBenchmarkRunner()
.equals(TpccConfiguration.PGADAPTER_JDBC_RUNNER)) {
// Run PGAdapter benchmark
statistics.setRunnerName("PGAdapter benchmark");
executor.submit(
new JdbcBenchmarkRunner(
statistics,
pgadapterConnectionUrl,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics));
} else if (tpccConfiguration
.getBenchmarkRunner()
.equals(TpccConfiguration.SPANNER_JDBC_RUNNER)) {
// Run Spanner JDBC benchmark
statistics.setRunnerName("Spanner JDBC benchmark");
executor.submit(
new JdbcBenchmarkRunner(
statistics,
spannerConnectionUrl,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics));
} else if (tpccConfiguration
.getBenchmarkRunner()
.equals(TpccConfiguration.CLIENT_LIB_PG_RUNNER)) {
// Run client library PG benchmark
statistics.setRunnerName("Client library PG benchmark");
executor.submit(
new JavaClientBenchmarkRunner(
statistics,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics,
Dialect.POSTGRESQL));
} else if (tpccConfiguration
.getBenchmarkRunner()
.equals(TpccConfiguration.CLIENT_LIB_GSQL_RUNNER)) {
// Run client library PG benchmark
statistics.setRunnerName("Client library GSQL benchmark");
executor.submit(
new JavaClientBenchmarkRunner(
statistics,
tpccConfiguration,
pgAdapterConfiguration,
spannerConfiguration,
metrics,
Dialect.GOOGLE_STANDARD_SQL));
}
}
Stopwatch watch = Stopwatch.createStarted();
while (watch.elapsed().compareTo(tpccConfiguration.getBenchmarkDuration()) <= 0) {
//noinspection BusyWait
Thread.sleep(1_000L);
statistics.print(watch.elapsed());
}
executor.shutdownNow();
if (!executor.awaitTermination(60L, TimeUnit.SECONDS)) {
throw new TimeoutException("Timed out while waiting for benchmark runners to shut down");
}
} else {
throw new RuntimeException(
"Unknown benchmark runner option: " + tpccConfiguration.getBenchmarkRunner());
}
} catch (IOException exception) {
throw new RuntimeException(exception);
} finally {
if (server != null) {
server.stopServer();
server.awaitTerminated();
}
}
}