void testMaxConcurrentStream()

in grpc-gcp-benchmarks/src/benchmarkTest/java/SpannerTestCases.java [388:447]


  void testMaxConcurrentStream() throws InterruptedException {
    System.out.println("\nTestMaxConcurrentStream");
    ManagedChannel channel = getChannel();
    SpannerBlockingStub stub = getBlockingStub(channel);
    Session session =
        stub.createSession(CreateSessionRequest.newBuilder().setDatabase(database).build());

    // Warm up.
    ExecuteSqlRequest request =
        ExecuteSqlRequest.newBuilder()
            .setSession(session.getName())
            .setSql("select * FROM " + LARGE_TABLE)
            .build();
    for (int i = 0; i < NUM_WARMUP; i++) {
      Iterator<PartialResultSet> iter = stub.executeStreamingSql(request);
      while (iter.hasNext()) {
        iter.next();
      }
    }

    // Start concurrent rpc calls.
    List<Iterator<PartialResultSet>> responses = new ArrayList<>();
    long start = System.currentTimeMillis();
    for (int i = 0; i < numOfRpcs; i++) {
      responses.add(stub.executeStreamingSql(request));
    }
    System.out.println(
        String.format(
            "Started %d ExecuteStreamingSql calls in %dms",
            numOfRpcs, System.currentTimeMillis() - start));

    // Start another rpc call using a new thread.
    Thread t = new Thread(() -> listSessionsSingleCall(stub));
    t.start();

    System.out.println("I'm sleeping and will wake up after 2000ms zzzZZZZ.");
    Thread.sleep(2000);
    System.out.println("Good morning!");

    // Free one call.
    while (responses.get(0).hasNext()) {
      responses.get(0).next();
    }
    System.out.println(
        String.format("Freed one call in %dms.", System.currentTimeMillis() - start));

    // Free all the calls.
    for (int i = 1; i < responses.size(); i++) {
      Iterator<PartialResultSet> iter = responses.get(i);
      while (iter.hasNext()) {
        iter.next();
      }
    }
    System.out.println(
        String.format("Freed %d call(s) in %dms.", numOfRpcs, System.currentTimeMillis() - start));

    t.join();
    stub.deleteSession(DeleteSessionRequest.newBuilder().setName(session.getName()).build());
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }