in ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java [433:500]
private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputImpl>, IOException> getStreams) {
final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
// add to ChannelMap
final ChannelId channelId = ctx.channel().id();
channels.add(channelId, key);
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
() -> newStreamInfo(request.slice(), getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
throw new IllegalStateException("Failed to create a new stream for " + request
+ " since a stream already exists Key: " + key + " StreamInfo:" + info);
}
getMetrics().onRequestCreate(RequestType.HEADER);
} else if (close) {
info = Optional.ofNullable(streams.remove(key)).orElseThrow(
() -> new IllegalStateException("Failed to remove StreamInfo for " + request));
} else {
info = Optional.ofNullable(streams.get(key)).orElseThrow(
() -> new IllegalStateException("Failed to get StreamInfo for " + request));
}
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites;
if (request.getType() == Type.STREAM_HEADER) {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
localWrite = info.getLocal().write(request.slice(), request.getWriteOptionList(), writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
|| request.getType() == Type.STREAM_DATA
|| close) {
sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx);
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
return null;
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
final StreamInfo removed = removeDataStream(key);
if (removed != null) {
Preconditions.assertSame(info, removed, "removed");
} else {
info.cleanUp(key);
}
} else if (close) {
info.applyToRemotes(remote -> remote.out.closeAsync());
}
} finally {
request.release();
channels.remove(channelId, key);
}
});
}