in util/ThreadTransferHistory.cpp [67:109]
ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue(
const Checkpoint &checkpoint, bool globalCheckpoint) {
const int64_t historySize = history_.size();
int64_t numReceivedSources = checkpoint.numBlocks;
int64_t lastBlockReceivedBytes = checkpoint.lastBlockReceivedBytes;
if (numReceivedSources > historySize) {
WLOG(ERROR)
<< "checkpoint is greater than total number of sources transferred "
<< history_.size() << " " << numReceivedSources;
return INVALID_CHECKPOINT;
}
ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint);
if (errCode == INVALID_CHECKPOINT) {
return INVALID_CHECKPOINT;
}
globalCheckpoint_ |= globalCheckpoint;
lastCheckpoint_ = std::make_unique<Checkpoint>(checkpoint);
int64_t numFailedSources = historySize - numReceivedSources;
if (numFailedSources == 0 && lastBlockReceivedBytes > 0) {
if (!globalCheckpoint) {
// no block to apply checkpoint offset. This can happen if we receive same
// local checkpoint without adding anything to the history
WLOG(WARNING)
<< "Local checkpoint has received bytes for last block, but "
"there are no unacked blocks in the history. Ignoring.";
}
}
numAcknowledged_ = numReceivedSources;
std::vector<std::unique_ptr<ByteSource>> sourcesToReturn;
for (int64_t i = 0; i < numFailedSources; i++) {
std::unique_ptr<ByteSource> source = std::move(history_.back());
history_.pop_back();
const Checkpoint *checkpointPtr =
(i == numFailedSources - 1 ? &checkpoint : nullptr);
markSourceAsFailed(source, checkpointPtr);
sourcesToReturn.emplace_back(std::move(source));
}
queue_.returnToQueue(sourcesToReturn);
WLOG(INFO) << numFailedSources
<< " number of sources returned to queue, checkpoint: "
<< checkpoint;
return errCode;
}