void ThreadTransferHistory::markSourceAsFailed()

in util/ThreadTransferHistory.cpp [181:226]


void ThreadTransferHistory::markSourceAsFailed(
    std::unique_ptr<ByteSource> &source, const Checkpoint *checkpoint) {
  auto &metadata = source->getMetaData();
  bool validCheckpoint = false;
  if (checkpoint != nullptr) {
    if (checkpoint->hasSeqId) {
      if ((checkpoint->lastBlockSeqId == metadata.seqId) &&
          (checkpoint->lastBlockOffset == source->getOffset())) {
        validCheckpoint = true;
      } else {
        WLOG(WARNING)
            << "Checkpoint block does not match history block. Checkpoint: "
            << checkpoint->lastBlockSeqId << ", " << checkpoint->lastBlockOffset
            << " History: " << metadata.seqId << ", " << source->getOffset();
      }
    } else {
      // Receiver at lower version!
      // checkpoint does not have seq-id. We have to blindly trust
      // lastBlockReceivedBytes. If we do not, transfer will fail because of
      // number of bytes mismatch. Even if an error happens because of this,
      // Receiver will fail.
      validCheckpoint = true;
    }
  }
  int64_t receivedBytes =
      (validCheckpoint ? checkpoint->lastBlockReceivedBytes : 0);
  TransferStats &sourceStats = source->getTransferStats();
  if (sourceStats.getLocalErrorCode() != OK) {
    // already marked as failed
    sourceStats.addEffectiveBytes(0, receivedBytes);
    threadStats_.addEffectiveBytes(0, receivedBytes);
  } else {
    auto dataBytes = source->getSize();
    auto headerBytes = sourceStats.getEffectiveHeaderBytes();
    int64_t wastedBytes = dataBytes - receivedBytes;
    sourceStats.subtractEffectiveBytes(headerBytes, wastedBytes);
    sourceStats.decrNumBlocks();
    sourceStats.setLocalErrorCode(SOCKET_WRITE_ERROR);
    sourceStats.incrFailedAttempts();

    threadStats_.subtractEffectiveBytes(headerBytes, wastedBytes);
    threadStats_.decrNumBlocks();
    threadStats_.incrFailedAttempts();
  }
  source->advanceOffset(receivedBytes);
}