in ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java [435:478]
public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final boolean isClose = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
final NettyClientReplies.ReplyMap replyMap = replies.getReplyMap(clientInvocationId);
final ChannelFuture channelFuture;
final Channel channel;
final NettyClientReplies.RequestEntry requestEntry = new NettyClientReplies.RequestEntry(request);
final NettyClientReplies.ReplyEntry replyEntry;
LOG.debug("{}: write begin {}", this, request);
synchronized (replyMap) {
channel = connection.getChannelUninterruptibly();
if (channel == null) {
f.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + request));
return f;
}
replyEntry = replyMap.submitRequest(requestEntry, isClose, f);
final Function<DataStreamRequest, ChannelFuture> writeMethod = outstandingRequests.shouldFlush(
flushRequestCountMin, flushRequestBytesMin, request)? channel::writeAndFlush: channel::write;
channelFuture = writeMethod.apply(request);
}
channelFuture.addListener(future -> {
if (!future.isSuccess()) {
final IOException e = new IOException(this + ": Failed to send " + request + " to " + channel.remoteAddress(),
future.cause());
f.completeExceptionally(e);
replyMap.fail(requestEntry);
LOG.error("Channel write failed", e);
} else {
LOG.debug("{}: write after {}", this, request);
final TimeDuration timeout = isClose ? closeTimeout : requestTimeout;
replyEntry.scheduleTimeout(() -> channel.eventLoop().schedule(() -> {
if (!f.isDone()) {
f.completeExceptionally(new TimeoutIOException(
"Timeout " + timeout + ": Failed to send " + request + " via channel " + channel));
replyMap.fail(requestEntry);
}
}, timeout.getDuration(), timeout.getUnit()));
}
});
return f;
}