in SenderThread.cpp [316:446]
TransferStats SenderThread::sendOneByteSource(
const std::unique_ptr<ByteSource> &source, ErrorCode transferStatus) {
TransferStats stats;
char headerBuf[Protocol::kMaxHeader];
int64_t off = 0;
headerBuf[off++] = Protocol::FILE_CMD;
headerBuf[off++] = transferStatus;
char *headerLenPtr = headerBuf + off;
off += sizeof(int16_t);
const int64_t expectedSize = source->getSize();
int64_t actualSize = 0;
const SourceMetaData &metadata = source->getMetaData();
BlockDetails blockDetails;
blockDetails.fileName = metadata.relPath;
blockDetails.seqId = metadata.seqId;
blockDetails.fileSize = metadata.size;
blockDetails.offset = source->getOffset();
blockDetails.dataSize = expectedSize;
blockDetails.allocationStatus = metadata.allocationStatus;
blockDetails.prevSeqId = metadata.prevSeqId;
Protocol::encodeHeader(wdtParent_->getProtocolVersion(), headerBuf, off,
Protocol::kMaxHeader, blockDetails);
int16_t littleEndianOff = folly::Endian::little((int16_t)off);
folly::storeUnaligned<int16_t>(headerLenPtr, littleEndianOff);
int64_t written = socket_->write(headerBuf, off);
if (written != off) {
WTPLOG(ERROR) << "Write error/mismatch " << written << " " << off
<< ". fd = " << socket_->getFd()
<< ". file = " << metadata.relPath
<< ". port = " << socket_->getPort();
stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
stats.incrFailedAttempts();
return stats;
}
stats.addHeaderBytes(written);
int64_t byteSourceHeaderBytes = written;
int64_t throttlerInstanceBytes = byteSourceHeaderBytes;
int64_t totalThrottlerBytes = 0;
WTVLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : "
<< folly::humanify(std::string(headerBuf, off));
int32_t checksum = 0;
while (!source->finished()) {
// TODO: handle protocol errors from readHeartBeats
readHeartBeats();
int64_t size;
char *buffer = source->read(size);
if (source->hasError()) {
WTLOG(ERROR) << "Failed reading file " << source->getIdentifier()
<< " for fd " << socket_->getFd();
break;
}
WDT_CHECK(buffer && size > 0);
if (footerType_ == CHECKSUM_FOOTER) {
checksum = folly::crc32c((const uint8_t *)buffer, size, checksum);
}
if (wdtParent_->getThrottler()) {
/**
* If throttling is enabled we call limit(deltaBytes) which
* used both the methods of throttling peak and average.
* Always call it with bytes being written to the wire, throttler
* will do the rest.
* The first time throttle is called with the header bytes
* included. In the next iterations throttler is only called
* with the bytes being written.
*/
throttlerInstanceBytes += size;
wdtParent_->getThrottler()->limit(*threadCtx_, throttlerInstanceBytes);
totalThrottlerBytes += throttlerInstanceBytes;
throttlerInstanceBytes = 0;
}
written = socket_->write(buffer, size, /* retry writes */ true);
if (getThreadAbortCode() != OK) {
WTLOG(ERROR) << "Transfer aborted during block transfer "
<< socket_->getPort() << " " << source->getIdentifier();
stats.setLocalErrorCode(ABORT);
stats.incrFailedAttempts();
return stats;
}
if (written != size) {
WTLOG(ERROR) << "Write error " << written << " (" << size << ")"
<< ". fd = " << socket_->getFd()
<< ". file = " << metadata.relPath
<< ". port = " << socket_->getPort();
stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
stats.incrFailedAttempts();
return stats;
}
stats.addDataBytes(written);
actualSize += written;
}
if (actualSize != expectedSize) {
// Can only happen if sender thread can not read complete source byte
// stream
WTLOG(ERROR) << "UGH " << source->getIdentifier() << " " << expectedSize
<< " " << actualSize;
struct stat fileStat;
if (stat(metadata.fullPath.c_str(), &fileStat) != 0) {
WTPLOG(ERROR) << "stat failed on path " << metadata.fullPath;
} else {
WTLOG(WARNING) << "file " << source->getIdentifier() << " previous size "
<< metadata.size << " current size " << fileStat.st_size;
}
stats.setLocalErrorCode(BYTE_SOURCE_READ_ERROR);
stats.incrFailedAttempts();
return stats;
}
if (wdtParent_->getThrottler() && actualSize > 0) {
WDT_CHECK(totalThrottlerBytes == actualSize + byteSourceHeaderBytes)
<< totalThrottlerBytes << " " << (actualSize + totalThrottlerBytes);
}
if (footerType_ != NO_FOOTER) {
off = 0;
headerBuf[off++] = Protocol::FOOTER_CMD;
Protocol::encodeFooter(headerBuf, off, Protocol::kMaxFooter, checksum);
int toWrite = off;
written = socket_->write(headerBuf, toWrite);
if (written != toWrite) {
WTLOG(ERROR) << "Write mismatch " << written << " " << toWrite;
stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
stats.incrFailedAttempts();
return stats;
}
stats.addHeaderBytes(toWrite);
}
stats.setLocalErrorCode(OK);
stats.incrNumBlocks();
stats.addEffectiveBytes(stats.getHeaderBytes(), stats.getDataBytes());
return stats;
}