AsyncSocket::WriteResult AsyncSocket::sendSocketMessage()

in folly/io/async/AsyncSocket.cpp [3326:3511]


AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
    const iovec* vec, size_t count, WriteFlags flags) {
  // lambda to gather and merge PrewriteRequests from observers
  auto mergePrewriteRequests = [this,
                                vec,
                                count,
                                flags,
                                maybeVecTotalBytes =
                                    folly::Optional<size_t>()]() mutable {
    AsyncTransport::LifecycleObserver::PrewriteRequest mergedRequest = {};
    if (lifecycleObservers_.empty()) {
      return mergedRequest;
    }

    // determine total number of bytes in vec, reuse once determined
    if (!maybeVecTotalBytes.has_value()) {
      maybeVecTotalBytes = 0;
      for (size_t i = 0; i < count; ++i) {
        maybeVecTotalBytes.value() += vec[i].iov_len;
      }
    }
    auto& vecTotalBytes = maybeVecTotalBytes.value();

    const auto startOffset = getRawBytesWritten();
    const auto endOffset = getRawBytesWritten() + vecTotalBytes - 1;
    const AsyncTransport::LifecycleObserver::PrewriteState prewriteState = [&] {
      AsyncTransport::LifecycleObserver::PrewriteState state = {};
      state.startOffset = startOffset;
      state.endOffset = endOffset;
      state.writeFlags = flags;
      state.ts = std::chrono::steady_clock::now();
      return state;
    }();
    for (const auto& observer : lifecycleObservers_) {
      if (!observer->getConfig().prewrite) {
        continue;
      }

      const auto request = observer->prewrite(this, prewriteState);

      mergedRequest.writeFlagsToAdd |= request.writeFlagsToAdd;
      if (request.maybeOffsetToSplitWrite.has_value()) {
        CHECK_GE(endOffset, request.maybeOffsetToSplitWrite.value());
        if (
            // case 1: offset not set in merged request
            !mergedRequest.maybeOffsetToSplitWrite.has_value() ||
            // case 2: offset in merged request > offset in current request
            mergedRequest.maybeOffsetToSplitWrite >
                request.maybeOffsetToSplitWrite) {
          mergedRequest.maybeOffsetToSplitWrite =
              request.maybeOffsetToSplitWrite; // update
          mergedRequest.writeFlagsToAddAtOffset =
              request.writeFlagsToAddAtOffset; // reset
        } else if (
            // case 3: offset in merged request == offset in current request
            request.maybeOffsetToSplitWrite ==
            mergedRequest.maybeOffsetToSplitWrite) {
          mergedRequest.writeFlagsToAddAtOffset |=
              request.writeFlagsToAddAtOffset; // merge
        }
        // case 4: offset in merged request < offset in current request
        // (do nothing)
      }
    }

    // if maybeOffsetToSplitWrite points to end of the vector, remove the
    // split
    if (mergedRequest.maybeOffsetToSplitWrite.has_value() && // explicit
        mergedRequest.maybeOffsetToSplitWrite == endOffset) {
      mergedRequest.maybeOffsetToSplitWrite.reset(); // no split needed
    }

    return mergedRequest;
  };

  // lambda to prepare and send a message, and handle byte events
  // parameters have L at the end to prevent shadowing warning from gcc
  auto prepSendMsg = [this](
                         const iovec* vecL,
                         const size_t countL,
                         const WriteFlags flagsL) {
    const bool byteEventsEnabled =
        (byteEventHelper_ && byteEventHelper_->byteEventsEnabled &&
         !byteEventHelper_->maybeEx.has_value());

    struct msghdr msg = {};
    msg.msg_name = nullptr;
    msg.msg_namelen = 0;
    msg.msg_iov = const_cast<struct iovec*>(vecL);
    msg.msg_iovlen = std::min<size_t>(countL, kIovMax);
    msg.msg_flags = 0; // passed to sendSocketMessage below, it sets them
    msg.msg_control = nullptr;
    msg.msg_controllen =
        sendMsgParamCallback_->getAncillaryDataSize(flagsL, byteEventsEnabled);
    CHECK_GE(
        AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
        msg.msg_controllen);

    if (msg.msg_controllen != 0) {
      msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
      sendMsgParamCallback_->getAncillaryData(
          flagsL, msg.msg_control, byteEventsEnabled);
    }

    const auto prewriteRawBytesWritten = getRawBytesWritten();
    int msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
    auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);

    if (writeResult.writeReturn < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
      // workaround for running with zerocopy enabled but without a big enough
      // memlock value - see ulimit -l
      zeroCopyEnabled_ = false;
      zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
      msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
      writeResult = sendSocketMessage(fd_, &msg, msg_flags);
    }

    if (writeResult.writeReturn > 0 && byteEventsEnabled &&
        isSet(flagsL, WriteFlags::TIMESTAMP_WRITE)) {
      CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
      ByteEvent byteEvent = {};
      byteEvent.type = ByteEvent::Type::WRITE;
      byteEvent.offset = getRawBytesWritten() - 1;
      byteEvent.maybeRawBytesWritten = writeResult.writeReturn;
      byteEvent.maybeRawBytesTriedToWrite = 0;
      for (size_t i = 0; i < countL; ++i) {
        byteEvent.maybeRawBytesTriedToWrite.value() += vecL[i].iov_len;
      }
      byteEvent.maybeWriteFlags = flagsL;
      for (const auto& observer : lifecycleObservers_) {
        if (observer->getConfig().byteEvents) {
          observer->byteEvent(this, byteEvent);
        }
      }
    }

    return writeResult;
  };

  // get PrewriteRequests (if any), merge flags with write flags
  const auto prewriteRequest = mergePrewriteRequests();
  auto mergedFlags = flags | prewriteRequest.writeFlagsToAdd |
      prewriteRequest.writeFlagsToAddAtOffset;

  // if no PrewriteRequests, or none requiring the write to be split, proceed
  if (!prewriteRequest.maybeOffsetToSplitWrite.has_value()) {
    return prepSendMsg(vec, count, mergedFlags);
  }

  // we need to split the write...
  // add CORK flag to inform the OS that more data is on the way...
  mergedFlags |= WriteFlags::CORK;

  // TODO(bschlinker): When prewrite splits a write, try to continue writing
  // after a write returns; this will improve efficiency.
  const auto splitWriteAtOffset = *prewriteRequest.maybeOffsetToSplitWrite;
  if (count <= kSmallIoVecSize) {
    // suppress "warning: variable length array 'vec' is used [-Wvla]"
    FOLLY_PUSH_WARNING
    FOLLY_GNU_DISABLE_WARNING("-Wvla")
    iovec tmpVec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallIoVecSize)];
    FOLLY_POP_WARNING

    size_t tmpVecCount = count;
    splitIovecArray(
        0,
        splitWriteAtOffset - getRawBytesWritten(),
        vec,
        count,
        tmpVec,
        tmpVecCount);
    return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
  } else {
    auto tmpVecPtr = std::make_unique<iovec[]>(count);
    auto tmpVec = tmpVecPtr.get();
    size_t tmpVecCount = count;
    splitIovecArray(
        0,
        splitWriteAtOffset - getRawBytesWritten(),
        vec,
        count,
        tmpVec,
        tmpVecCount);
    return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
  }
}