TransferStats SenderThread::sendOneByteSource()

in SenderThread.cpp [316:446]


TransferStats SenderThread::sendOneByteSource(
    const std::unique_ptr<ByteSource> &source, ErrorCode transferStatus) {
  TransferStats stats;
  char headerBuf[Protocol::kMaxHeader];
  int64_t off = 0;
  headerBuf[off++] = Protocol::FILE_CMD;
  headerBuf[off++] = transferStatus;
  char *headerLenPtr = headerBuf + off;
  off += sizeof(int16_t);
  const int64_t expectedSize = source->getSize();
  int64_t actualSize = 0;
  const SourceMetaData &metadata = source->getMetaData();
  BlockDetails blockDetails;
  blockDetails.fileName = metadata.relPath;
  blockDetails.seqId = metadata.seqId;
  blockDetails.fileSize = metadata.size;
  blockDetails.offset = source->getOffset();
  blockDetails.dataSize = expectedSize;
  blockDetails.allocationStatus = metadata.allocationStatus;
  blockDetails.prevSeqId = metadata.prevSeqId;
  Protocol::encodeHeader(wdtParent_->getProtocolVersion(), headerBuf, off,
                         Protocol::kMaxHeader, blockDetails);
  int16_t littleEndianOff = folly::Endian::little((int16_t)off);
  folly::storeUnaligned<int16_t>(headerLenPtr, littleEndianOff);
  int64_t written = socket_->write(headerBuf, off);
  if (written != off) {
    WTPLOG(ERROR) << "Write error/mismatch " << written << " " << off
                  << ". fd = " << socket_->getFd()
                  << ". file = " << metadata.relPath
                  << ". port = " << socket_->getPort();
    stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
    stats.incrFailedAttempts();
    return stats;
  }

  stats.addHeaderBytes(written);
  int64_t byteSourceHeaderBytes = written;
  int64_t throttlerInstanceBytes = byteSourceHeaderBytes;
  int64_t totalThrottlerBytes = 0;
  WTVLOG(3) << "Sent " << written << " on " << socket_->getFd() << " : "
            << folly::humanify(std::string(headerBuf, off));
  int32_t checksum = 0;
  while (!source->finished()) {
    // TODO: handle protocol errors from readHeartBeats
    readHeartBeats();

    int64_t size;
    char *buffer = source->read(size);
    if (source->hasError()) {
      WTLOG(ERROR) << "Failed reading file " << source->getIdentifier()
                   << " for fd " << socket_->getFd();
      break;
    }
    WDT_CHECK(buffer && size > 0);
    if (footerType_ == CHECKSUM_FOOTER) {
      checksum = folly::crc32c((const uint8_t *)buffer, size, checksum);
    }
    if (wdtParent_->getThrottler()) {
      /**
       * If throttling is enabled we call limit(deltaBytes) which
       * used both the methods of throttling peak and average.
       * Always call it with bytes being written to the wire, throttler
       * will do the rest.
       * The first time throttle is called with the header bytes
       * included. In the next iterations throttler is only called
       * with the bytes being written.
       */
      throttlerInstanceBytes += size;
      wdtParent_->getThrottler()->limit(*threadCtx_, throttlerInstanceBytes);
      totalThrottlerBytes += throttlerInstanceBytes;
      throttlerInstanceBytes = 0;
    }
    written = socket_->write(buffer, size, /* retry writes */ true);
    if (getThreadAbortCode() != OK) {
      WTLOG(ERROR) << "Transfer aborted during block transfer "
                   << socket_->getPort() << " " << source->getIdentifier();
      stats.setLocalErrorCode(ABORT);
      stats.incrFailedAttempts();
      return stats;
    }
    if (written != size) {
      WTLOG(ERROR) << "Write error " << written << " (" << size << ")"
                   << ". fd = " << socket_->getFd()
                   << ". file = " << metadata.relPath
                   << ". port = " << socket_->getPort();
      stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
      stats.incrFailedAttempts();
      return stats;
    }
    stats.addDataBytes(written);
    actualSize += written;
  }
  if (actualSize != expectedSize) {
    // Can only happen if sender thread can not read complete source byte
    // stream
    WTLOG(ERROR) << "UGH " << source->getIdentifier() << " " << expectedSize
                 << " " << actualSize;
    struct stat fileStat;
    if (stat(metadata.fullPath.c_str(), &fileStat) != 0) {
      WTPLOG(ERROR) << "stat failed on path " << metadata.fullPath;
    } else {
      WTLOG(WARNING) << "file " << source->getIdentifier() << " previous size "
                     << metadata.size << " current size " << fileStat.st_size;
    }
    stats.setLocalErrorCode(BYTE_SOURCE_READ_ERROR);
    stats.incrFailedAttempts();
    return stats;
  }
  if (wdtParent_->getThrottler() && actualSize > 0) {
    WDT_CHECK(totalThrottlerBytes == actualSize + byteSourceHeaderBytes)
        << totalThrottlerBytes << " " << (actualSize + totalThrottlerBytes);
  }
  if (footerType_ != NO_FOOTER) {
    off = 0;
    headerBuf[off++] = Protocol::FOOTER_CMD;
    Protocol::encodeFooter(headerBuf, off, Protocol::kMaxFooter, checksum);
    int toWrite = off;
    written = socket_->write(headerBuf, toWrite);
    if (written != toWrite) {
      WTLOG(ERROR) << "Write mismatch " << written << " " << toWrite;
      stats.setLocalErrorCode(SOCKET_WRITE_ERROR);
      stats.incrFailedAttempts();
      return stats;
    }
    stats.addHeaderBytes(toWrite);
  }
  stats.setLocalErrorCode(OK);
  stats.incrNumBlocks();
  stats.addEffectiveBytes(stats.getHeaderBytes(), stats.getDataBytes());
  return stats;
}