in fizz/protocol/AsyncFizzBase.cpp [241:308]
void AsyncFizzBase::deliverAppData(std::unique_ptr<folly::IOBuf> data) {
if (data) {
appBytesReceived_ += data->computeChainDataLength();
}
if (appDataBuf_) {
if (data) {
appDataBuf_->prependChain(std::move(data));
}
data = std::move(appDataBuf_);
}
while (readCallback_ && data) {
if (readCallback_->isBufferMovable()) {
return readCallback_->readBufferAvailable(std::move(data));
} else {
folly::io::Cursor cursor(data.get());
size_t available = 0;
while ((available = cursor.totalLength()) != 0 && readCallback_ &&
!readCallback_->isBufferMovable()) {
void* buf = nullptr;
size_t buflen = 0;
try {
readCallback_->getReadBuffer(&buf, &buflen);
} catch (const AsyncSocketException& ase) {
return deliverError(ase);
} catch (const std::exception& e) {
AsyncSocketException ase(
AsyncSocketException::BAD_ARGS,
folly::to<std::string>("getReadBuffer() threw ", e.what()));
return deliverError(ase);
} catch (...) {
AsyncSocketException ase(
AsyncSocketException::BAD_ARGS,
"getReadBuffer() threw unknown exception");
return deliverError(ase);
}
if (buflen == 0 || buf == nullptr) {
AsyncSocketException ase(
AsyncSocketException::BAD_ARGS,
"getReadBuffer() returned empty buffer");
return deliverError(ase);
}
size_t bytesToRead = std::min(buflen, available);
cursor.pull(buf, bytesToRead);
readCallback_->readDataAvailable(bytesToRead);
}
// If we have data left, it means the read callback changed and we need
// to save the remaining data (if any)
if (available != 0) {
std::unique_ptr<folly::IOBuf> remainingData;
cursor.clone(remainingData, available);
data = std::move(remainingData);
} else {
// Out of data. Reset the data pointer to end the loop
data.reset();
}
}
}
if (data) {
appDataBuf_ = std::move(data);
}
checkBufLen();
}