in util/DirectorySourceQueue.cpp [680:722]
std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource(
ThreadCtx *callerThreadCtx, ErrorCode &status) {
std::unique_ptr<ByteSource> source;
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
numWaiters_++;
// notify if someone's waiting for previous batch to finish
conditionPrevTransfer_.notify_all();
while (sourceQueue_.empty() && !initFinished_) {
conditionNotEmpty_.wait(lock);
}
numWaiters_--;
if (!failedSourceStats_.empty() || !failedDirectories_.empty()) {
status = ERROR;
} else {
status = OK;
}
if (sourceQueue_.empty()) {
return nullptr;
}
// using const_cast since priority_queue returns a const reference
source = std::move(
const_cast<std::unique_ptr<ByteSource> &>(sourceQueue_.top()));
sourceQueue_.pop();
if (sourceQueue_.empty() && initFinished_) {
conditionNotEmpty_.notify_all();
}
lock.unlock();
WVLOG(1) << "got next source " << rootDir_ + source->getIdentifier()
<< " size " << source->getSize();
// try to open the source
if (source->open(callerThreadCtx) == OK) {
lock.lock();
numBlocksDequeued_++;
return source;
}
source->close();
// we need to lock again as we will be adding element to failedSourceStats
// vector
lock.lock();
failedSourceStats_.emplace_back(std::move(source->getTransferStats()));
}
}