in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/qps/AsyncClient.java [201:244]
private Future<Histogram> doStreamingCalls(BenchmarkServiceClient client, final SimpleRequest request,
final long endTime) {
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
final AtomicLong lastCall = new AtomicLong();
lastCall.set(System.nanoTime());
final AtomicReference<ActorRef> requestIngress = new AtomicReference<>();
Source<SimpleRequest, NotUsed> requestSource =
Source.<SimpleRequest> actorRef(2, OverflowStrategy.fail())
.mapMaterializedValue(ref -> {
requestIngress.set(ref);
requestIngress.get().tell(request, ActorRef.noSender());
return NotUsed.getInstance();
});
CompletionStage<Done> done = client.streamingCall(requestSource)
.runWith(Sink.foreach(rsp -> {
long now = System.nanoTime();
// Record the latencies in microseconds
histogram.recordValue((now - lastCall.get()) / 1000);
lastCall.set(now);
if (endTime > now) {
requestIngress.get().tell(request, ActorRef.noSender());
} else {
requestIngress.get().tell(new org.apache.pekko.actor.Status.Success("done"), ActorRef.noSender());
}
}), mat);
done.whenComplete((d, t) -> {
if (t == null) {
future.done();
} else {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in streamingCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
}
});
return future;
}