public void run()

in benchmark-java/src/main/java/org/apache/pekko/grpc/benchmarks/driver/LoadClient.java [301:339]


    public void run() {
      while (!shutdown) {
        maxOutstanding.acquireUninterruptibly();
        if (!shutdown) {

          final AtomicLong lastCall = new AtomicLong();
          lastCall.set(System.nanoTime());
          final AtomicReference<ActorRef> requestIngress = new AtomicReference<>();
          Source<Messages.SimpleRequest, NotUsed> requestSource =
              Source.<Messages.SimpleRequest>actorRef(2, OverflowStrategy.fail())
                  .mapMaterializedValue(ref -> {
                    requestIngress.set(ref);
                    requestIngress.get().tell(simpleRequest, ActorRef.noSender());
                    return NotUsed.getInstance();
                  });
          CompletionStage<Done> done = stub.streamingCall(requestSource)
              .runWith(Sink.foreach(rsp -> {
                delay(System.nanoTime() - lastCall.get());
                if (shutdown) {
                  requestIngress.get().tell(new org.apache.pekko.actor.Status.Success("done"), ActorRef.noSender());
                  // Must not send another request.
                  return;
                }
                requestIngress.get().tell(simpleRequest, ActorRef.noSender());
                lastCall.set(System.nanoTime());
              }), mat);

          done.whenComplete((d, t) -> {
            if (t == null) {
              maxOutstanding.release();
            } else {
              maxOutstanding.release();
              Level level = shutdown ? Level.FINE : Level.INFO;
              log.log(level, "Error in Async Ping-Pong call", t);
            }
          });
        }
      }
    }