in folly/io/async/AsyncSocket.cpp [3326:3511]
AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
const iovec* vec, size_t count, WriteFlags flags) {
// lambda to gather and merge PrewriteRequests from observers
auto mergePrewriteRequests = [this,
vec,
count,
flags,
maybeVecTotalBytes =
folly::Optional<size_t>()]() mutable {
AsyncTransport::LifecycleObserver::PrewriteRequest mergedRequest = {};
if (lifecycleObservers_.empty()) {
return mergedRequest;
}
// determine total number of bytes in vec, reuse once determined
if (!maybeVecTotalBytes.has_value()) {
maybeVecTotalBytes = 0;
for (size_t i = 0; i < count; ++i) {
maybeVecTotalBytes.value() += vec[i].iov_len;
}
}
auto& vecTotalBytes = maybeVecTotalBytes.value();
const auto startOffset = getRawBytesWritten();
const auto endOffset = getRawBytesWritten() + vecTotalBytes - 1;
const AsyncTransport::LifecycleObserver::PrewriteState prewriteState = [&] {
AsyncTransport::LifecycleObserver::PrewriteState state = {};
state.startOffset = startOffset;
state.endOffset = endOffset;
state.writeFlags = flags;
state.ts = std::chrono::steady_clock::now();
return state;
}();
for (const auto& observer : lifecycleObservers_) {
if (!observer->getConfig().prewrite) {
continue;
}
const auto request = observer->prewrite(this, prewriteState);
mergedRequest.writeFlagsToAdd |= request.writeFlagsToAdd;
if (request.maybeOffsetToSplitWrite.has_value()) {
CHECK_GE(endOffset, request.maybeOffsetToSplitWrite.value());
if (
// case 1: offset not set in merged request
!mergedRequest.maybeOffsetToSplitWrite.has_value() ||
// case 2: offset in merged request > offset in current request
mergedRequest.maybeOffsetToSplitWrite >
request.maybeOffsetToSplitWrite) {
mergedRequest.maybeOffsetToSplitWrite =
request.maybeOffsetToSplitWrite; // update
mergedRequest.writeFlagsToAddAtOffset =
request.writeFlagsToAddAtOffset; // reset
} else if (
// case 3: offset in merged request == offset in current request
request.maybeOffsetToSplitWrite ==
mergedRequest.maybeOffsetToSplitWrite) {
mergedRequest.writeFlagsToAddAtOffset |=
request.writeFlagsToAddAtOffset; // merge
}
// case 4: offset in merged request < offset in current request
// (do nothing)
}
}
// if maybeOffsetToSplitWrite points to end of the vector, remove the
// split
if (mergedRequest.maybeOffsetToSplitWrite.has_value() && // explicit
mergedRequest.maybeOffsetToSplitWrite == endOffset) {
mergedRequest.maybeOffsetToSplitWrite.reset(); // no split needed
}
return mergedRequest;
};
// lambda to prepare and send a message, and handle byte events
// parameters have L at the end to prevent shadowing warning from gcc
auto prepSendMsg = [this](
const iovec* vecL,
const size_t countL,
const WriteFlags flagsL) {
const bool byteEventsEnabled =
(byteEventHelper_ && byteEventHelper_->byteEventsEnabled &&
!byteEventHelper_->maybeEx.has_value());
struct msghdr msg = {};
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = const_cast<struct iovec*>(vecL);
msg.msg_iovlen = std::min<size_t>(countL, kIovMax);
msg.msg_flags = 0; // passed to sendSocketMessage below, it sets them
msg.msg_control = nullptr;
msg.msg_controllen =
sendMsgParamCallback_->getAncillaryDataSize(flagsL, byteEventsEnabled);
CHECK_GE(
AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
msg.msg_controllen);
if (msg.msg_controllen != 0) {
msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
sendMsgParamCallback_->getAncillaryData(
flagsL, msg.msg_control, byteEventsEnabled);
}
const auto prewriteRawBytesWritten = getRawBytesWritten();
int msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
if (writeResult.writeReturn < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
// workaround for running with zerocopy enabled but without a big enough
// memlock value - see ulimit -l
zeroCopyEnabled_ = false;
zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
writeResult = sendSocketMessage(fd_, &msg, msg_flags);
}
if (writeResult.writeReturn > 0 && byteEventsEnabled &&
isSet(flagsL, WriteFlags::TIMESTAMP_WRITE)) {
CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
ByteEvent byteEvent = {};
byteEvent.type = ByteEvent::Type::WRITE;
byteEvent.offset = getRawBytesWritten() - 1;
byteEvent.maybeRawBytesWritten = writeResult.writeReturn;
byteEvent.maybeRawBytesTriedToWrite = 0;
for (size_t i = 0; i < countL; ++i) {
byteEvent.maybeRawBytesTriedToWrite.value() += vecL[i].iov_len;
}
byteEvent.maybeWriteFlags = flagsL;
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
}
}
}
return writeResult;
};
// get PrewriteRequests (if any), merge flags with write flags
const auto prewriteRequest = mergePrewriteRequests();
auto mergedFlags = flags | prewriteRequest.writeFlagsToAdd |
prewriteRequest.writeFlagsToAddAtOffset;
// if no PrewriteRequests, or none requiring the write to be split, proceed
if (!prewriteRequest.maybeOffsetToSplitWrite.has_value()) {
return prepSendMsg(vec, count, mergedFlags);
}
// we need to split the write...
// add CORK flag to inform the OS that more data is on the way...
mergedFlags |= WriteFlags::CORK;
// TODO(bschlinker): When prewrite splits a write, try to continue writing
// after a write returns; this will improve efficiency.
const auto splitWriteAtOffset = *prewriteRequest.maybeOffsetToSplitWrite;
if (count <= kSmallIoVecSize) {
// suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla")
iovec tmpVec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallIoVecSize)];
FOLLY_POP_WARNING
size_t tmpVecCount = count;
splitIovecArray(
0,
splitWriteAtOffset - getRawBytesWritten(),
vec,
count,
tmpVec,
tmpVecCount);
return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
} else {
auto tmpVecPtr = std::make_unique<iovec[]>(count);
auto tmpVec = tmpVecPtr.get();
size_t tmpVecCount = count;
splitIovecArray(
0,
splitWriteAtOffset - getRawBytesWritten(),
vec,
count,
tmpVec,
tmpVecCount);
return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
}
}