in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/qps/AsyncClient.java [159:199]
private Future<Histogram> doUnaryCalls(BenchmarkServiceClient client, final SimpleRequest request,
final long endTime) {
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
final AtomicLong lastCall = new AtomicLong();
lastCall.set(System.nanoTime());
CompletionStage<SimpleResponse> response = client.unaryCall(request);
final BiConsumer<SimpleResponse, ? super Throwable> responseCallback = new BiConsumer<SimpleResponse, Throwable>() {
long lastCall = System.nanoTime();
@Override
public void accept(SimpleResponse rsp, Throwable t) {
if (t == null) {
long now = System.nanoTime();
// Record the latencies in microseconds
histogram.recordValue((now - lastCall) / 1000);
lastCall = now;
if (endTime > now) {
CompletionStage<SimpleResponse> response = client.unaryCall(request);
response.whenComplete(this); // use same BiConsumer instance
} else {
future.done();
}
} else {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in unaryCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
}
}
};
// FIXME whenCompleteAsync?
response.whenComplete(responseCallback);
return future;
}