public CompletableFuture streamAsync()

in ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java [435:478]


  public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
    final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
    ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
    final boolean isClose = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);

    final NettyClientReplies.ReplyMap replyMap = replies.getReplyMap(clientInvocationId);
    final ChannelFuture channelFuture;
    final Channel channel;
    final NettyClientReplies.RequestEntry requestEntry = new NettyClientReplies.RequestEntry(request);
    final NettyClientReplies.ReplyEntry replyEntry;
    LOG.debug("{}: write begin {}", this, request);
    synchronized (replyMap) {
      channel = connection.getChannelUninterruptibly();
      if (channel == null) {
        f.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + request));
        return f;
      }
      replyEntry = replyMap.submitRequest(requestEntry, isClose, f);
      final Function<DataStreamRequest, ChannelFuture> writeMethod = outstandingRequests.shouldFlush(
          flushRequestCountMin, flushRequestBytesMin, request)? channel::writeAndFlush: channel::write;
      channelFuture = writeMethod.apply(request);
    }
    channelFuture.addListener(future -> {
      if (!future.isSuccess()) {
        final IOException e = new IOException(this + ": Failed to send " + request + " to " + channel.remoteAddress(),
            future.cause());
        f.completeExceptionally(e);
        replyMap.fail(requestEntry);
        LOG.error("Channel write failed", e);
      } else {
        LOG.debug("{}: write after {}", this, request);

        final TimeDuration timeout = isClose ? closeTimeout : requestTimeout;
        replyEntry.scheduleTimeout(() -> channel.eventLoop().schedule(() -> {
          if (!f.isDone()) {
            f.completeExceptionally(new TimeoutIOException(
                "Timeout " + timeout + ": Failed to send " + request + " via channel " + channel));
            replyMap.fail(requestEntry);
          }
        }, timeout.getDuration(), timeout.getUnit()));
      }
    });
    return f;
  }