in grpc-gcp-benchmarks/src/benchmarkTest/java/SpannerClientV1TestCases.java [249:307]
void testMaxConcurrentStream() throws InterruptedException {
System.out.println("\nTestMaxConcurrentStream");
SpannerClient client = getClient();
Session session =
client.createSession(CreateSessionRequest.newBuilder().setDatabase(database).build());
// Warm up.
ExecuteSqlRequest request =
ExecuteSqlRequest.newBuilder()
.setSession(session.getName())
.setSql("select * FROM " + LARGE_TABLE)
.build();
for (int i = 0; i < NUM_WARMUP; i++) {
Iterator<PartialResultSet> iter =
client.executeStreamingSqlCallable().call(request).iterator();
while (iter.hasNext()) {
iter.next();
}
}
// Start concurrent rpc calls.
List<Iterator<PartialResultSet>> responses = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < numOfRpcs; i++) {
responses.add(client.executeStreamingSqlCallable().call(request).iterator());
}
System.out.println(
String.format(
"Started %d ExecuteStreamingSql calls in %d ms",
numOfRpcs, System.currentTimeMillis() - start));
// Start another rpc call using a new thread.
Thread t = new Thread(() -> listSessionsSingleCall(client));
t.start();
System.out.println("I'm sleeping and will wake up after 2000ms zzzZZZZ.");
Thread.sleep(2000);
System.out.println("Good morning!");
// Free one call.
while (responses.get(0).hasNext()) {
responses.get(0).next();
}
System.out.println(
String.format("Freed one call in %dms.", System.currentTimeMillis() - start));
// Free all the calls.
for (int i = 1; i < responses.size(); i++) {
Iterator<PartialResultSet> iter = responses.get(i);
while (iter.hasNext()) {
iter.next();
}
}
System.out.println(
String.format("Freed %d call(s) in %dms.", numOfRpcs, System.currentTimeMillis() - start));
t.join();
cleanUpClient(client, session.getName());
}