in grpc-gcp-benchmarks/src/benchmarkTest/java/SpannerTestCases.java [388:447]
void testMaxConcurrentStream() throws InterruptedException {
System.out.println("\nTestMaxConcurrentStream");
ManagedChannel channel = getChannel();
SpannerBlockingStub stub = getBlockingStub(channel);
Session session =
stub.createSession(CreateSessionRequest.newBuilder().setDatabase(database).build());
// Warm up.
ExecuteSqlRequest request =
ExecuteSqlRequest.newBuilder()
.setSession(session.getName())
.setSql("select * FROM " + LARGE_TABLE)
.build();
for (int i = 0; i < NUM_WARMUP; i++) {
Iterator<PartialResultSet> iter = stub.executeStreamingSql(request);
while (iter.hasNext()) {
iter.next();
}
}
// Start concurrent rpc calls.
List<Iterator<PartialResultSet>> responses = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < numOfRpcs; i++) {
responses.add(stub.executeStreamingSql(request));
}
System.out.println(
String.format(
"Started %d ExecuteStreamingSql calls in %dms",
numOfRpcs, System.currentTimeMillis() - start));
// Start another rpc call using a new thread.
Thread t = new Thread(() -> listSessionsSingleCall(stub));
t.start();
System.out.println("I'm sleeping and will wake up after 2000ms zzzZZZZ.");
Thread.sleep(2000);
System.out.println("Good morning!");
// Free one call.
while (responses.get(0).hasNext()) {
responses.get(0).next();
}
System.out.println(
String.format("Freed one call in %dms.", System.currentTimeMillis() - start));
// Free all the calls.
for (int i = 1; i < responses.size(); i++) {
Iterator<PartialResultSet> iter = responses.get(i);
while (iter.hasNext()) {
iter.next();
}
}
System.out.println(
String.format("Freed %d call(s) in %dms.", numOfRpcs, System.currentTimeMillis() - start));
t.join();
stub.deleteSession(DeleteSessionRequest.newBuilder().setName(session.getName()).build());
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}