in thrift/lib/cpp2/transport/rocket/server/RocketServerConnection.cpp [538:662]
void RocketServerConnection::handleSinkFrame(
std::unique_ptr<folly::IOBuf> frame,
StreamId streamId,
FrameType frameType,
Flags flags,
folly::io::Cursor cursor,
RocketSinkClientCallback& clientCallback) {
if (!clientCallback.serverCallbackReady()) {
switch (frameType) {
case FrameType::ERROR: {
ErrorFrame errorFrame{std::move(frame)};
if (errorFrame.errorCode() == ErrorCode::CANCELED) {
return clientCallback.earlyCancelled();
}
}
default:
return close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID,
fmt::format(
"Received unexpected early frame, stream id ({}) type ({})",
static_cast<uint32_t>(streamId),
static_cast<uint8_t>(frameType))));
}
}
auto handleSinkPayload = [&](PayloadFrame&& payloadFrame) {
const bool next = payloadFrame.hasNext();
const bool complete = payloadFrame.hasComplete();
if (auto fullPayload = bufferOrGetFullPayload(std::move(payloadFrame))) {
bool notViolateContract = true;
if (next) {
auto streamPayload =
rocket::unpack<StreamPayload>(std::move(*fullPayload));
if (streamPayload.hasException()) {
notViolateContract =
clientCallback.onSinkError(std::move(streamPayload.exception()));
if (notViolateContract) {
freeStream(streamId, true);
}
} else {
auto payloadMetadataRef =
streamPayload->metadata.payloadMetadata_ref();
if (payloadMetadataRef &&
payloadMetadataRef->getType() ==
PayloadMetadata::exceptionMetadata) {
notViolateContract = clientCallback.onSinkError(
apache::thrift::detail::EncodedStreamError(
std::move(streamPayload.value())));
if (notViolateContract) {
freeStream(streamId, true);
}
} else {
notViolateContract =
clientCallback.onSinkNext(std::move(*streamPayload));
}
}
}
if (complete) {
// it is possible final repsonse(error) sent from serverCallback,
// serverCallback may be already destoryed.
if (streams_.find(streamId) != streams_.end()) {
notViolateContract = clientCallback.onSinkComplete();
}
}
if (!notViolateContract) {
close(folly::make_exception_wrapper<transport::TTransportException>(
transport::TTransportException::TTransportExceptionType::
STREAMING_CONTRACT_VIOLATION,
"receiving sink payload frame after sink completion"));
}
}
};
switch (frameType) {
case FrameType::PAYLOAD: {
PayloadFrame payloadFrame(streamId, flags, cursor, std::move(frame));
handleSinkPayload(std::move(payloadFrame));
} break;
case FrameType::ERROR: {
ErrorFrame errorFrame{std::move(frame)};
auto ew = [&] {
if (errorFrame.errorCode() == ErrorCode::CANCELED) {
return folly::make_exception_wrapper<TApplicationException>(
TApplicationException::TApplicationExceptionType::INTERRUPTION);
} else {
return folly::make_exception_wrapper<RocketException>(
errorFrame.errorCode(), std::move(errorFrame.payload()).data());
}
}();
bool notViolateContract = clientCallback.onSinkError(std::move(ew));
if (notViolateContract) {
freeStream(streamId, true);
} else {
close(folly::make_exception_wrapper<transport::TTransportException>(
transport::TTransportException::TTransportExceptionType::
STREAMING_CONTRACT_VIOLATION,
"receiving sink error frame after sink completion"));
}
} break;
case FrameType::EXT: {
ExtFrame extFrame(streamId, flags, cursor, std::move(frame));
auto extFrameType = extFrame.extFrameType();
if (extFrameType == ExtFrameType::ALIGNED_PAGE ||
extFrameType == ExtFrameType::CUSTOM_ALLOC) {
PayloadFrame payloadFrame(
streamId, std::move(extFrame.payload()), flags);
handleSinkPayload(std::move(payloadFrame));
break;
}
}
default:
close(folly::make_exception_wrapper<RocketException>(
ErrorCode::INVALID,
fmt::format(
"Received unhandleable frame type ({}) for sink (id {})",
static_cast<uint8_t>(frameType),
static_cast<uint32_t>(streamId))));
}
}