public void write()

in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java [57:88]


  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
      throws Exception {
    if (!(msg instanceof NettyRequest)) {
      super.write(ctx, msg, promise);
      return;
    }
    final NettyRequest request = (NettyRequest) msg;
    if (inflightRequest != null) {
      // this is a BUG: sending new request while an old request is in progress.
      // we fail both of these requests.
      IllegalStateException cause =
          new IllegalStateException("A Channel has not finished the previous request.");
      request.completeAttemptExceptionally(cause);
      exceptionCaught(ctx, cause);
      return;
    }
    this.inflightRequest = request;
    // a new NettyRequestReply was introduced into the pipeline.
    // we remember that request and forward an HTTP request on its behalf upstream.
    // from now on, every exception thrown during the processing of this pipeline, either during the
    // following section or
    // during read(), will be caught and delivered to the @inFlightRequest via #exceptionCaught().
    ByteBuf content = null;
    try {
      content = serializeProtobuf(ctx.channel().alloc()::buffer, request.toFunction());
      writeHttpRequest(ctx, content, request);
      scheduleRequestTimeout(ctx, request.remainingRequestBudgetNanos());
    } catch (Throwable t) {
      ReferenceCountUtil.safeRelease(content);
      exceptionCaught(ctx, t);
    }
  }