public void onNext()

in ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java [163:208]


    public void onNext(REQUEST request) {
      ReferenceCountedObject<REQUEST> requestRef = ReferenceCountedObject.wrap(request, () -> {}, released -> {
        if (released) {
          release(request);
        }
      });

      if (!replyInOrder(request)) {
        try {
          composeRequest(process(requestRef).thenApply(this::handleReply));
        } catch (Exception e) {
          handleError(e, getCallId(request), isHeartbeat(request));
          release(request);
        }
        return;
      }

      final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
      final long callId = getCallId(current.getRequest());
      final boolean isHeartbeat = isHeartbeat(current.getRequest());
      final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
      final CompletableFuture<Void> previousFuture = previous.map(PendingServerRequest::getFuture)
          .orElse(CompletableFuture.completedFuture(null));
      try {
        final CompletableFuture<REPLY> f = process(requestRef).exceptionally(e -> {
          // Handle cases, such as RaftServer is paused
          handleError(e, callId, isHeartbeat);
          current.getFuture().completeExceptionally(e);
          return null;
        }).thenCombine(previousFuture, (reply, v) -> {
          handleReply(reply);
          current.getFuture().complete(null);
          return null;
        });
        composeRequest(f);
      } catch (Exception e) {
        handleError(e, callId, isHeartbeat);
        current.getFuture().completeExceptionally(e);
      } finally {
        previous.ifPresent(PendingServerRequest::release);
        if (isClosed.get()) {
          // Some requests may come after onCompleted or onError, ensure they're released.
          releaseLast();
        }
      }
    }