in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java [57:88]
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (!(msg instanceof NettyRequest)) {
super.write(ctx, msg, promise);
return;
}
final NettyRequest request = (NettyRequest) msg;
if (inflightRequest != null) {
// this is a BUG: sending new request while an old request is in progress.
// we fail both of these requests.
IllegalStateException cause =
new IllegalStateException("A Channel has not finished the previous request.");
request.completeAttemptExceptionally(cause);
exceptionCaught(ctx, cause);
return;
}
this.inflightRequest = request;
// a new NettyRequestReply was introduced into the pipeline.
// we remember that request and forward an HTTP request on its behalf upstream.
// from now on, every exception thrown during the processing of this pipeline, either during the
// following section or
// during read(), will be caught and delivered to the @inFlightRequest via #exceptionCaught().
ByteBuf content = null;
try {
content = serializeProtobuf(ctx.channel().alloc()::buffer, request.toFunction());
writeHttpRequest(ctx, content, request);
scheduleRequestTimeout(ctx, request.remainingRequestBudgetNanos());
} catch (Throwable t) {
ReferenceCountUtil.safeRelease(content);
exceptionCaught(ctx, t);
}
}