in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/driver/LoadClient.java [301:339]
public void run() {
while (!shutdown) {
maxOutstanding.acquireUninterruptibly();
if (!shutdown) {
final AtomicLong lastCall = new AtomicLong();
lastCall.set(System.nanoTime());
final AtomicReference<ActorRef> requestIngress = new AtomicReference<>();
Source<Messages.SimpleRequest, NotUsed> requestSource =
Source.<Messages.SimpleRequest>actorRef(2, OverflowStrategy.fail())
.mapMaterializedValue(ref -> {
requestIngress.set(ref);
requestIngress.get().tell(simpleRequest, ActorRef.noSender());
return NotUsed.getInstance();
});
CompletionStage<Done> done = stub.streamingCall(requestSource)
.runWith(Sink.foreach(rsp -> {
delay(System.nanoTime() - lastCall.get());
if (shutdown) {
requestIngress.get().tell(new org.apache.pekko.actor.Status.Success("done"), ActorRef.noSender());
// Must not send another request.
return;
}
requestIngress.get().tell(simpleRequest, ActorRef.noSender());
lastCall.set(System.nanoTime());
}), mat);
done.whenComplete((d, t) -> {
if (t == null) {
maxOutstanding.release();
} else {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
log.log(level, "Error in Async Ping-Pong call", t);
}
});
}
}
}