void RocketServerConnection::handleSinkFrame()

in thrift/lib/cpp2/transport/rocket/server/RocketServerConnection.cpp [538:662]


void RocketServerConnection::handleSinkFrame(
    std::unique_ptr<folly::IOBuf> frame,
    StreamId streamId,
    FrameType frameType,
    Flags flags,
    folly::io::Cursor cursor,
    RocketSinkClientCallback& clientCallback) {
  if (!clientCallback.serverCallbackReady()) {
    switch (frameType) {
      case FrameType::ERROR: {
        ErrorFrame errorFrame{std::move(frame)};
        if (errorFrame.errorCode() == ErrorCode::CANCELED) {
          return clientCallback.earlyCancelled();
        }
      }
      default:
        return close(folly::make_exception_wrapper<RocketException>(
            ErrorCode::INVALID,
            fmt::format(
                "Received unexpected early frame, stream id ({}) type ({})",
                static_cast<uint32_t>(streamId),
                static_cast<uint8_t>(frameType))));
    }
  }

  auto handleSinkPayload = [&](PayloadFrame&& payloadFrame) {
    const bool next = payloadFrame.hasNext();
    const bool complete = payloadFrame.hasComplete();
    if (auto fullPayload = bufferOrGetFullPayload(std::move(payloadFrame))) {
      bool notViolateContract = true;
      if (next) {
        auto streamPayload =
            rocket::unpack<StreamPayload>(std::move(*fullPayload));
        if (streamPayload.hasException()) {
          notViolateContract =
              clientCallback.onSinkError(std::move(streamPayload.exception()));
          if (notViolateContract) {
            freeStream(streamId, true);
          }
        } else {
          auto payloadMetadataRef =
              streamPayload->metadata.payloadMetadata_ref();
          if (payloadMetadataRef &&
              payloadMetadataRef->getType() ==
                  PayloadMetadata::exceptionMetadata) {
            notViolateContract = clientCallback.onSinkError(
                apache::thrift::detail::EncodedStreamError(
                    std::move(streamPayload.value())));
            if (notViolateContract) {
              freeStream(streamId, true);
            }
          } else {
            notViolateContract =
                clientCallback.onSinkNext(std::move(*streamPayload));
          }
        }
      }

      if (complete) {
        // it is possible final repsonse(error) sent from serverCallback,
        // serverCallback may be already destoryed.
        if (streams_.find(streamId) != streams_.end()) {
          notViolateContract = clientCallback.onSinkComplete();
        }
      }

      if (!notViolateContract) {
        close(folly::make_exception_wrapper<transport::TTransportException>(
            transport::TTransportException::TTransportExceptionType::
                STREAMING_CONTRACT_VIOLATION,
            "receiving sink payload frame after sink completion"));
      }
    }
  };

  switch (frameType) {
    case FrameType::PAYLOAD: {
      PayloadFrame payloadFrame(streamId, flags, cursor, std::move(frame));
      handleSinkPayload(std::move(payloadFrame));
    } break;

    case FrameType::ERROR: {
      ErrorFrame errorFrame{std::move(frame)};
      auto ew = [&] {
        if (errorFrame.errorCode() == ErrorCode::CANCELED) {
          return folly::make_exception_wrapper<TApplicationException>(
              TApplicationException::TApplicationExceptionType::INTERRUPTION);
        } else {
          return folly::make_exception_wrapper<RocketException>(
              errorFrame.errorCode(), std::move(errorFrame.payload()).data());
        }
      }();

      bool notViolateContract = clientCallback.onSinkError(std::move(ew));
      if (notViolateContract) {
        freeStream(streamId, true);
      } else {
        close(folly::make_exception_wrapper<transport::TTransportException>(
            transport::TTransportException::TTransportExceptionType::
                STREAMING_CONTRACT_VIOLATION,
            "receiving sink error frame after sink completion"));
      }
    } break;

    case FrameType::EXT: {
      ExtFrame extFrame(streamId, flags, cursor, std::move(frame));
      auto extFrameType = extFrame.extFrameType();
      if (extFrameType == ExtFrameType::ALIGNED_PAGE ||
          extFrameType == ExtFrameType::CUSTOM_ALLOC) {
        PayloadFrame payloadFrame(
            streamId, std::move(extFrame.payload()), flags);
        handleSinkPayload(std::move(payloadFrame));
        break;
      }
    }

    default:
      close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID,
          fmt::format(
              "Received unhandleable frame type ({}) for sink (id {})",
              static_cast<uint8_t>(frameType),
              static_cast<uint32_t>(streamId))));
  }
}