in ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java [163:208]
public void onNext(REQUEST request) {
ReferenceCountedObject<REQUEST> requestRef = ReferenceCountedObject.wrap(request, () -> {}, released -> {
if (released) {
release(request);
}
});
if (!replyInOrder(request)) {
try {
composeRequest(process(requestRef).thenApply(this::handleReply));
} catch (Exception e) {
handleError(e, getCallId(request), isHeartbeat(request));
release(request);
}
return;
}
final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
final long callId = getCallId(current.getRequest());
final boolean isHeartbeat = isHeartbeat(current.getRequest());
final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
final CompletableFuture<Void> previousFuture = previous.map(PendingServerRequest::getFuture)
.orElse(CompletableFuture.completedFuture(null));
try {
final CompletableFuture<REPLY> f = process(requestRef).exceptionally(e -> {
// Handle cases, such as RaftServer is paused
handleError(e, callId, isHeartbeat);
current.getFuture().completeExceptionally(e);
return null;
}).thenCombine(previousFuture, (reply, v) -> {
handleReply(reply);
current.getFuture().complete(null);
return null;
});
composeRequest(f);
} catch (Exception e) {
handleError(e, callId, isHeartbeat);
current.getFuture().completeExceptionally(e);
} finally {
previous.ifPresent(PendingServerRequest::release);
if (isClosed.get()) {
// Some requests may come after onCompleted or onError, ensure they're released.
releaseLast();
}
}
}