in util/ThreadTransferHistory.cpp [181:226]
void ThreadTransferHistory::markSourceAsFailed(
std::unique_ptr<ByteSource> &source, const Checkpoint *checkpoint) {
auto &metadata = source->getMetaData();
bool validCheckpoint = false;
if (checkpoint != nullptr) {
if (checkpoint->hasSeqId) {
if ((checkpoint->lastBlockSeqId == metadata.seqId) &&
(checkpoint->lastBlockOffset == source->getOffset())) {
validCheckpoint = true;
} else {
WLOG(WARNING)
<< "Checkpoint block does not match history block. Checkpoint: "
<< checkpoint->lastBlockSeqId << ", " << checkpoint->lastBlockOffset
<< " History: " << metadata.seqId << ", " << source->getOffset();
}
} else {
// Receiver at lower version!
// checkpoint does not have seq-id. We have to blindly trust
// lastBlockReceivedBytes. If we do not, transfer will fail because of
// number of bytes mismatch. Even if an error happens because of this,
// Receiver will fail.
validCheckpoint = true;
}
}
int64_t receivedBytes =
(validCheckpoint ? checkpoint->lastBlockReceivedBytes : 0);
TransferStats &sourceStats = source->getTransferStats();
if (sourceStats.getLocalErrorCode() != OK) {
// already marked as failed
sourceStats.addEffectiveBytes(0, receivedBytes);
threadStats_.addEffectiveBytes(0, receivedBytes);
} else {
auto dataBytes = source->getSize();
auto headerBytes = sourceStats.getEffectiveHeaderBytes();
int64_t wastedBytes = dataBytes - receivedBytes;
sourceStats.subtractEffectiveBytes(headerBytes, wastedBytes);
sourceStats.decrNumBlocks();
sourceStats.setLocalErrorCode(SOCKET_WRITE_ERROR);
sourceStats.incrFailedAttempts();
threadStats_.subtractEffectiveBytes(headerBytes, wastedBytes);
threadStats_.decrNumBlocks();
threadStats_.incrFailedAttempts();
}
source->advanceOffset(receivedBytes);
}