std::unique_ptr DirectorySourceQueue::getNextSource()

in util/DirectorySourceQueue.cpp [680:722]


std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource(
    ThreadCtx *callerThreadCtx, ErrorCode &status) {
  std::unique_ptr<ByteSource> source;
  while (true) {
    std::unique_lock<std::mutex> lock(mutex_);
    numWaiters_++;
    // notify if someone's waiting for previous batch to finish
    conditionPrevTransfer_.notify_all();
    while (sourceQueue_.empty() && !initFinished_) {
      conditionNotEmpty_.wait(lock);
    }
    numWaiters_--;
    if (!failedSourceStats_.empty() || !failedDirectories_.empty()) {
      status = ERROR;
    } else {
      status = OK;
    }
    if (sourceQueue_.empty()) {
      return nullptr;
    }
    // using const_cast since priority_queue returns a const reference
    source = std::move(
        const_cast<std::unique_ptr<ByteSource> &>(sourceQueue_.top()));
    sourceQueue_.pop();
    if (sourceQueue_.empty() && initFinished_) {
      conditionNotEmpty_.notify_all();
    }
    lock.unlock();
    WVLOG(1) << "got next source " << rootDir_ + source->getIdentifier()
             << " size " << source->getSize();
    // try to open the source
    if (source->open(callerThreadCtx) == OK) {
      lock.lock();
      numBlocksDequeued_++;
      return source;
    }
    source->close();
    // we need to lock again as we will be adding element to failedSourceStats
    // vector
    lock.lock();
    failedSourceStats_.emplace_back(std::move(source->getTransferStats()));
  }
}