private Future doStreamingCalls()

in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/qps/AsyncClient.java [201:244]


  private Future<Histogram> doStreamingCalls(BenchmarkServiceClient client, final SimpleRequest request,
                                                    final long endTime) {
    final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
    final HistogramFuture future = new HistogramFuture(histogram);

    final AtomicLong lastCall = new AtomicLong();
    lastCall.set(System.nanoTime());
    final AtomicReference<ActorRef> requestIngress = new AtomicReference<>();
    Source<SimpleRequest, NotUsed> requestSource =
        Source.<SimpleRequest> actorRef(2, OverflowStrategy.fail())
        .mapMaterializedValue(ref -> {
          requestIngress.set(ref);
          requestIngress.get().tell(request, ActorRef.noSender());
          return NotUsed.getInstance();
        });
    CompletionStage<Done> done = client.streamingCall(requestSource)
        .runWith(Sink.foreach(rsp -> {

          long now = System.nanoTime();
          // Record the latencies in microseconds
          histogram.recordValue((now - lastCall.get()) / 1000);
          lastCall.set(now);

          if (endTime > now) {
            requestIngress.get().tell(request, ActorRef.noSender());
          } else {
            requestIngress.get().tell(new org.apache.pekko.actor.Status.Success("done"), ActorRef.noSender());
          }
        }), mat);

    done.whenComplete((d, t) -> {
      if (t == null) {
        future.done();
      } else {
        Status status = Status.fromThrowable(t);
        System.err.println("Encountered an error in streamingCall. Status is " + status);
        t.printStackTrace();

        future.cancel(true);
      }
    });

    return future;
  }