ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue()

in util/ThreadTransferHistory.cpp [67:109]


ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue(
    const Checkpoint &checkpoint, bool globalCheckpoint) {
  const int64_t historySize = history_.size();
  int64_t numReceivedSources = checkpoint.numBlocks;
  int64_t lastBlockReceivedBytes = checkpoint.lastBlockReceivedBytes;
  if (numReceivedSources > historySize) {
    WLOG(ERROR)
        << "checkpoint is greater than total number of sources transferred "
        << history_.size() << " " << numReceivedSources;
    return INVALID_CHECKPOINT;
  }
  ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint);
  if (errCode == INVALID_CHECKPOINT) {
    return INVALID_CHECKPOINT;
  }
  globalCheckpoint_ |= globalCheckpoint;
  lastCheckpoint_ = std::make_unique<Checkpoint>(checkpoint);
  int64_t numFailedSources = historySize - numReceivedSources;
  if (numFailedSources == 0 && lastBlockReceivedBytes > 0) {
    if (!globalCheckpoint) {
      // no block to apply checkpoint offset. This can happen if we receive same
      // local checkpoint without adding anything to the history
      WLOG(WARNING)
          << "Local checkpoint has received bytes for last block, but "
             "there are no unacked blocks in the history. Ignoring.";
    }
  }
  numAcknowledged_ = numReceivedSources;
  std::vector<std::unique_ptr<ByteSource>> sourcesToReturn;
  for (int64_t i = 0; i < numFailedSources; i++) {
    std::unique_ptr<ByteSource> source = std::move(history_.back());
    history_.pop_back();
    const Checkpoint *checkpointPtr =
        (i == numFailedSources - 1 ? &checkpoint : nullptr);
    markSourceAsFailed(source, checkpointPtr);
    sourcesToReturn.emplace_back(std::move(source));
  }
  queue_.returnToQueue(sourcesToReturn);
  WLOG(INFO) << numFailedSources
             << " number of sources returned to queue, checkpoint: "
             << checkpoint;
  return errCode;
}