in folly/io/async/AsyncSocket.cpp [1621:1766]
void AsyncSocket::writeImpl(
WriteCallback* callback,
const iovec* vec,
size_t count,
unique_ptr<IOBuf>&& buf,
size_t totalBytes,
WriteFlags flags) {
VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
<< ", callback=" << callback << ", count=" << count
<< ", state=" << state_;
DestructorGuard dg(this);
unique_ptr<IOBuf> ioBuf(std::move(buf));
eventBase_->dcheckIsInEventBaseThread();
auto* releaseIOBufCallback =
callback ? callback->getReleaseIOBufCallback() : nullptr;
SCOPE_EXIT { releaseIOBuf(std::move(ioBuf), releaseIOBufCallback); };
totalAppBytesScheduledForWrite_ += totalBytes;
if (ioBuf) {
allocatedBytesBuffered_ += ioBuf->computeChainCapacity();
}
if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
// No new writes may be performed after the write side of the socket has
// been shutdown.
//
// We could just call callback->writeError() here to fail just this write.
// However, fail hard and use invalidState() to fail all outstanding
// callbacks and move the socket into the error state. There's most likely
// a bug in the caller's code, so we abort everything rather than trying to
// proceed as best we can.
return invalidState(callback);
}
uint32_t countWritten = 0;
uint32_t partialWritten = 0;
ssize_t bytesWritten = 0;
bool mustRegister = false;
if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
!connecting()) {
if (writeReqHead_ == nullptr) {
// If we are established and there are no other writes pending,
// we can attempt to perform the write immediately.
assert(writeReqTail_ == nullptr);
assert((eventFlags_ & EventHandler::WRITE) == 0);
auto writeResult = performWrite(
vec, uint32_t(count), flags, &countWritten, &partialWritten);
bytesWritten = writeResult.writeReturn;
if (bytesWritten < 0) {
auto errnoCopy = errno;
if (writeResult.exception) {
return failWrite(__func__, callback, 0, *writeResult.exception);
}
AsyncSocketException ex(
AsyncSocketException::INTERNAL_ERROR,
withAddr("writev failed"),
errnoCopy);
return failWrite(__func__, callback, 0, ex);
} else if (countWritten == count) {
// done, add the whole buffer
if (countWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(std::move(ioBuf), releaseIOBufCallback);
} else {
releaseIOBuf(std::move(ioBuf), releaseIOBufCallback);
}
// We successfully wrote everything.
// Invoke the callback and return.
if (callback) {
callback->writeSuccess();
}
return;
} else { // continue writing the next writeReq
// add just the ptr
if (bytesWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(ioBuf.get());
}
}
if (!connecting()) {
// Writes might put the socket back into connecting state
// if TFO is enabled, and using TFO fails.
// This means that write timeouts would not be active, however
// connect timeouts would affect this stage.
mustRegister = true;
}
}
} else if (!connecting()) {
// Invalid state for writing
return invalidState(callback);
}
// Create a new WriteRequest to add to the queue
WriteRequest* req;
try {
req = BytesWriteRequest::newRequest(
this,
callback,
vec + countWritten,
uint32_t(count - countWritten),
partialWritten,
uint32_t(bytesWritten),
std::move(ioBuf),
flags);
} catch (const std::exception& ex) {
// we mainly expect to catch std::bad_alloc here
AsyncSocketException tex(
AsyncSocketException::INTERNAL_ERROR,
withAddr(string("failed to append new WriteRequest: ") + ex.what()));
return failWrite(__func__, callback, size_t(bytesWritten), tex);
}
req->consume();
if (writeReqTail_ == nullptr) {
assert(writeReqHead_ == nullptr);
writeReqHead_ = writeReqTail_ = req;
} else {
writeReqTail_->append(req);
writeReqTail_ = req;
}
if (bufferCallback_) {
bufferCallback_->onEgressBuffered();
}
// Register for write events if are established and not currently
// waiting on write events
if (mustRegister) {
assert(state_ == StateEnum::ESTABLISHED);
assert((eventFlags_ & EventHandler::WRITE) == 0);
if (!updateEventRegistration(EventHandler::WRITE, 0)) {
assert(state_ == StateEnum::ERROR);
return;
}
if (sendTimeout_ > 0) {
// Schedule a timeout to fire if the write takes too long.
if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
AsyncSocketException ex(
AsyncSocketException::INTERNAL_ERROR,
withAddr("failed to schedule send timeout"));
return failWrite(__func__, ex);
}
}
}
}