private void readImpl()

in ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java [433:500]


  private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
      CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
    final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
    ClientInvocationId key =  ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());

    // add to ChannelMap
    final ChannelId channelId = ctx.channel().id();
    channels.add(channelId, key);

    final StreamInfo info;
    if (request.getType() == Type.STREAM_HEADER) {
      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
          () -> newStreamInfo(request.slice(), getStreams));
      info = streams.computeIfAbsent(key, id -> supplier.get());
      if (!supplier.isInitialized()) {
        throw new IllegalStateException("Failed to create a new stream for " + request
            + " since a stream already exists Key: " + key + " StreamInfo:" + info);
      }
      getMetrics().onRequestCreate(RequestType.HEADER);
    } else if (close) {
      info = Optional.ofNullable(streams.remove(key)).orElseThrow(
          () -> new IllegalStateException("Failed to remove StreamInfo for " + request));
    } else {
      info = Optional.ofNullable(streams.get(key)).orElseThrow(
          () -> new IllegalStateException("Failed to get StreamInfo for " + request));
    }

    final CompletableFuture<Long> localWrite;
    final List<CompletableFuture<DataStreamReply>> remoteWrites;
    if (request.getType() == Type.STREAM_HEADER) {
      localWrite = CompletableFuture.completedFuture(0L);
      remoteWrites = Collections.emptyList();
    } else if (request.getType() == Type.STREAM_DATA) {
      localWrite = info.getLocal().write(request.slice(), request.getWriteOptionList(), writeExecutor);
      remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
    } else {
      throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
    }

    composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
        .thenCombineAsync(localWrite, (v, bytesWritten) -> {
          if (request.getType() == Type.STREAM_HEADER
              || request.getType() == Type.STREAM_DATA
              || close) {
            sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx);
          } else {
            throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
          }
          return null;
        }, requestExecutor)).whenComplete((v, exception) -> {
      try {
        if (exception != null) {
          replyDataStreamException(server, exception, info.getRequest(), request, ctx);
          final StreamInfo removed = removeDataStream(key);
          if (removed != null) {
            Preconditions.assertSame(info, removed, "removed");
          } else {
            info.cleanUp(key);
          }
        } else if (close) {
          info.applyToRemotes(remote -> remote.out.closeAsync());
        }
      } finally {
        request.release();
        channels.remove(channelId, key);
      }
    });
  }