util/DirectorySourceQueue.cpp (600 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/util/DirectorySourceQueue.h> #include <sys/stat.h> #include <sys/types.h> #include <unistd.h> #include <wdt/Protocol.h> #include <algorithm> #include <set> #include <utility> #include <fcntl.h> #include <regex> // NOTE: this should remain standalone code and not use WdtOptions directly // also note this is used not just by the Sender but also by the receiver // (so code like opening files during discovery is disabled by default and // no reading the config directly from the options and only set by the Sender) namespace facebook { namespace wdt { using std::string; WdtFileInfo::WdtFileInfo(const string &name, int64_t size, bool doDirectReads) : fileName(name), fileSize(size), directReads(doDirectReads) { } WdtFileInfo::WdtFileInfo(int fd, int64_t size, const string &name) : WdtFileInfo(name, size, false) { this->fd = fd; } void WdtFileInfo::verifyAndFixFlags() { if (fd >= 0) { #ifdef O_DIRECT int flags = fcntl(fd, F_GETFL, 0); // directReads does not depend on the option in this case directReads = (flags & O_DIRECT); // Do not have to worry about F_NOCACHE, since it has no alignment // requirement #endif } if (directReads) { #ifndef WDT_SUPPORTS_ODIRECT WLOG(WARNING) << "Wdt can't handle O_DIRECT in this system. " << fileName; directReads = false; #endif } } DirectorySourceQueue::DirectorySourceQueue(const WdtOptions &options, const string &rootDir, IAbortChecker const *abortChecker) { threadCtx_ = std::make_unique<ThreadCtx>(options, /* do not allocate buffer */ false); threadCtx_->setAbortChecker(abortChecker); setRootDir(rootDir); } void DirectorySourceQueue::setIncludePattern(const string &includePattern) { includePattern_ = includePattern; } void DirectorySourceQueue::setExcludePattern(const string &excludePattern) { excludePattern_ = excludePattern; } void DirectorySourceQueue::setPruneDirPattern(const string &pruneDirPattern) { pruneDirPattern_ = pruneDirPattern; } void DirectorySourceQueue::setBlockSizeMbytes(int64_t blockSizeMbytes) { blockSizeMbytes_ = blockSizeMbytes; } void DirectorySourceQueue::setFileInfo( const std::vector<WdtFileInfo> &fileInfo) { fileInfo_ = fileInfo; exploreDirectory_ = false; } void DirectorySourceQueue::setFileInfoGenerator( WdtTransferRequest::FileInfoGenerator gen) { fileInfoGenerator_ = std::move(gen); exploreDirectory_ = false; } std::vector<WdtFileInfo> DirectorySourceQueue::getFileInfo() const { std::lock_guard<std::mutex> lock(mutex_); return fileInfo_; } void DirectorySourceQueue::setFollowSymlinks(const bool followSymlinks) { followSymlinks_ = followSymlinks; if (followSymlinks_) { setRootDir(rootDir_); } } std::vector<SourceMetaData *> &DirectorySourceQueue::getDiscoveredFilesMetaData() { return sharedFileData_; } // const ref string param but first thing we do is make a copy because // of logging original input vs resolved one bool DirectorySourceQueue::setRootDir(const string &newRootDir) { if (newRootDir.empty()) { WLOG(ERROR) << "Invalid empty root dir!"; return false; } string dir(newRootDir); if (followSymlinks_) { dir.assign(resolvePath(newRootDir)); if (dir.empty()) { // error already logged return false; } WLOG(INFO) << "Following symlinks " << newRootDir << " -> " << dir; } if (dir.back() != '/') { dir.push_back('/'); } if (dir != rootDir_) { rootDir_.assign(dir); WLOG(INFO) << "Root dir now " << rootDir_; } return true; } void DirectorySourceQueue::clearSourceQueue() { // clear current content of the queue. For some reason, priority_queue does // not have a clear method while (!sourceQueue_.empty()) { sourceQueue_.pop(); } } void DirectorySourceQueue::setPreviouslyReceivedChunks( std::vector<FileChunksInfo> &previouslyTransferredChunks) { std::unique_lock<std::mutex> lock(mutex_); WDT_CHECK_EQ(0, numBlocksDequeued_); // reset all the queue variables nextSeqId_ = 0; totalFileSize_ = 0; numEntries_ = 0; numBlocks_ = 0; for (auto &chunkInfo : previouslyTransferredChunks) { nextSeqId_ = std::max(nextSeqId_, chunkInfo.getSeqId() + 1); auto fileName = chunkInfo.getFileName(); previouslyTransferredChunks_.insert( std::make_pair(std::move(fileName), std::move(chunkInfo))); } clearSourceQueue(); // recreate the queue for (const auto metadata : sharedFileData_) { // TODO: do not notify inside createIntoQueueInternal. This method still // holds the lock, so no point in notifying createIntoQueueInternal(metadata); } enqueueFilesToBeDeleted(); } DirectorySourceQueue::~DirectorySourceQueue() { // need to remove all the sources because they access metadata at the // destructor. clearSourceQueue(); for (SourceMetaData *fileData : sharedFileData_) { if (fileData->needToClose && fileData->fd >= 0) { int ret = ::close(fileData->fd); if (ret) { WPLOG(ERROR) << "Failed to close file " << fileData->fullPath; } } delete fileData; } } std::thread DirectorySourceQueue::buildQueueAsynchronously() { // relying on RVO (and thread not copyable to avoid multiple ones) return std::thread(&DirectorySourceQueue::buildQueueSynchronously, this); } bool DirectorySourceQueue::buildQueueSynchronously() { auto startTime = Clock::now(); WVLOG(1) << "buildQueueSynchronously() called"; { std::lock_guard<std::mutex> lock(mutex_); if (initCalled_) { return false; } initCalled_ = true; } bool res = false; // either traverse directory or we already have a fixed set of candidate // files if (exploreDirectory_) { res = explore(); } else { WLOG(INFO) << "Using list of file info. Number of files " << fileInfo_.size(); res = enqueueFiles(fileInfo_); // also check if there is a generator to get more files while (res && fileInfoGenerator_) { auto files = fileInfoGenerator_(); if (!files) { break; } res = enqueueFiles(*files); std::lock_guard<std::mutex> lock(mutex_); fileInfo_.insert(fileInfo_.end(), files->begin(), files->end()); } } { std::lock_guard<std::mutex> lock(mutex_); initFinished_ = true; enqueueFilesToBeDeleted(); // TODO: comment why if (sourceQueue_.empty()) { conditionNotEmpty_.notify_all(); } } directoryTime_ = durationSeconds(Clock::now() - startTime); WVLOG(1) << "finished initialization of DirectorySourceQueue in " << directoryTime_; return res; } // TODO: move this and a bunch of stuff into FileUtil and/or System class string DirectorySourceQueue::resolvePath(const string &path) { // Use realpath() as it resolves to a nice canonicalized // full path we can used for the stat() call later, // readlink could still give us a relative path // and making sure the output buffer is sized appropriately // can be ugly string result; char *resolvedPath = realpath(path.c_str(), nullptr); if (!resolvedPath) { WPLOG(ERROR) << "Couldn't resolve " << path; return result; // empty string == error } result.assign(resolvedPath); free(resolvedPath); WVLOG(3) << "resolvePath(\"" << path << "\") -> " << result; return result; } bool DirectorySourceQueue::explore() { WLOG(INFO) << "Exploring root dir " << rootDir_ << " include_pattern : " << includePattern_ << " exclude_pattern : " << excludePattern_ << " prune_dir_pattern : " << pruneDirPattern_; WDT_CHECK(!rootDir_.empty()); bool hasError = false; std::set<string> visited; std::regex includeRegex(includePattern_); std::regex excludeRegex(excludePattern_); std::regex pruneDirRegex(pruneDirPattern_); std::deque<string> todoList; todoList.push_back(""); while (!todoList.empty()) { if (threadCtx_->getAbortChecker()->shouldAbort()) { WLOG(ERROR) << "Directory transfer thread aborted"; hasError = true; break; } // would be nice to do those 2 in 1 call... auto relativePath = todoList.front(); todoList.pop_front(); const string fullPath = rootDir_ + relativePath; WVLOG(1) << "Processing directory " << fullPath; DIR *dirPtr = opendir(fullPath.c_str()); if (!dirPtr) { WPLOG(ERROR) << "Error opening dir " << fullPath; failedDirectories_.emplace_back(fullPath); hasError = true; continue; } // http://elliotth.blogspot.com/2012/10/how-not-to-use-readdirr3.html // tl;dr readdir is actually better than readdir_r ! (because of the // nastiness of calculating correctly buffer size and race conditions there) struct dirent *dirEntryRes = nullptr; while (true) { if (threadCtx_->getAbortChecker()->shouldAbort()) { break; } errno = 0; // yes that's right dirEntryRes = readdir(dirPtr); if (!dirEntryRes) { if (errno) { WPLOG(ERROR) << "Error reading dir " << fullPath; // closedir always called hasError = true; } else { WVLOG(2) << "Done with " << fullPath; // finished reading dir } break; } const auto dType = dirEntryRes->d_type; WVLOG(2) << "Found entry " << dirEntryRes->d_name << " type " << (int)dType; if (dirEntryRes->d_name[0] == '.') { if (dirEntryRes->d_name[1] == '\0' || (dirEntryRes->d_name[1] == '.' && dirEntryRes->d_name[2] == '\0')) { WVLOG(3) << "Skipping entry : " << dirEntryRes->d_name; continue; } } // Following code is a bit ugly trying to save stat() call for directories // yet still work for xfs which returns DT_UNKNOWN for everything // would be simpler to always stat() // if we reach DT_DIR and DT_REG directly: bool isDir = (dType == DT_DIR); bool isLink = (dType == DT_LNK); bool keepEntry = (isDir || dType == DT_REG || dType == DT_UNKNOWN); if (followSymlinks_) { keepEntry |= isLink; } if (!keepEntry) { WVLOG(3) << "Ignoring entry type " << (int)(dType); continue; } string newRelativePath = relativePath + string(dirEntryRes->d_name); string newFullPath = rootDir_ + newRelativePath; if (!isDir) { // DT_REG, DT_LNK or DT_UNKNOWN cases struct stat fileStat; // On XFS we don't know yet if this is a symlink, so check // if following symlinks is ok we will do stat() too if (lstat(newFullPath.c_str(), &fileStat) != 0) { WPLOG(ERROR) << "lstat() failed on path " << newFullPath; hasError = true; continue; } isLink = S_ISLNK(fileStat.st_mode); WVLOG(2) << "lstat for " << newFullPath << " is link ? " << isLink; if (followSymlinks_ && isLink) { // Use stat to see if the pointed file is of the right type // (overrides previous stat call result) if (stat(newFullPath.c_str(), &fileStat) != 0) { WPLOG(ERROR) << "stat() failed on path " << newFullPath; hasError = true; continue; } newFullPath = resolvePath(newFullPath); if (newFullPath.empty()) { // already logged error hasError = true; continue; } WVLOG(2) << "Resolved symlink " << dirEntryRes->d_name << " to " << newFullPath; } // could dcheck that if DT_REG we better be !isDir isDir = S_ISDIR(fileStat.st_mode); // if we were DT_UNKNOWN this could still be a symlink, block device // etc... (xfs) if (S_ISREG(fileStat.st_mode)) { WVLOG(2) << "Found file " << newFullPath << " of size " << fileStat.st_size; if (!excludePattern_.empty() && std::regex_match(newRelativePath, excludeRegex)) { continue; } if (!includePattern_.empty() && !std::regex_match(newRelativePath, includeRegex)) { continue; } WdtFileInfo fileInfo(newRelativePath, fileStat.st_size, directReads_); createIntoQueue(newFullPath, fileInfo); continue; } } if (isDir) { if (followSymlinks_) { if (visited.find(newFullPath) != visited.end()) { WLOG(ERROR) << "Attempted to visit directory twice: " << newFullPath; hasError = true; continue; } // TODO: consider custom hashing ignoring common prefix visited.insert(newFullPath); } newRelativePath.push_back('/'); if (pruneDirPattern_.empty() || !std::regex_match(newRelativePath, pruneDirRegex)) { WVLOG(2) << "Adding " << newRelativePath; todoList.push_back(std::move(newRelativePath)); } } } closedir(dirPtr); } WLOG(INFO) << "Number of files explored: " << numEntries_ << " opened " << numFilesOpened_ << " with direct " << numFilesOpenedWithDirect_ << " errors " << std::boolalpha << hasError; return !hasError; } void DirectorySourceQueue::smartNotify(int32_t addedSource) { if (addedSource >= numClientThreads_) { conditionNotEmpty_.notify_all(); return; } for (int i = 0; i < addedSource; i++) { conditionNotEmpty_.notify_one(); } } void DirectorySourceQueue::returnToQueue( std::vector<std::unique_ptr<ByteSource>> &sources) { int returnedCount = 0; std::unique_lock<std::mutex> lock(mutex_); for (auto &source : sources) { sourceQueue_.push(std::move(source)); returnedCount++; WDT_CHECK_GT(numBlocksDequeued_, 0); numBlocksDequeued_--; } lock.unlock(); smartNotify(returnedCount); } void DirectorySourceQueue::returnToQueue(std::unique_ptr<ByteSource> &source) { std::vector<std::unique_ptr<ByteSource>> sources; sources.emplace_back(std::move(source)); returnToQueue(sources); } void DirectorySourceQueue::createIntoQueue(const string &fullPath, WdtFileInfo &fileInfo) { // TODO: currently we are treating small files(size less than blocksize) as // blocks. Also, we transfer file name in the header for all the blocks for a // large file. This can be optimized as follows - // a) if filesize < blocksize, we do not send blocksize and offset in the // header. This should be useful for tiny files(0-few hundred bytes). We will // have to use separate header format and commands for files and blocks. // b) if filesize > blocksize, we can use send filename only in the first // block and use a shorter header for subsequent blocks. Also, we can remove // block size once negotiated, since blocksize is sort of fixed. fileInfo.verifyAndFixFlags(); SourceMetaData *metadata = new SourceMetaData(); metadata->fullPath = fullPath; metadata->relPath = fileInfo.fileName; metadata->fd = fileInfo.fd; metadata->directReads = fileInfo.directReads; metadata->size = fileInfo.fileSize; if ((openFilesDuringDiscovery_ != 0) && (metadata->fd < 0)) { metadata->fd = FileUtil::openForRead(*threadCtx_, fullPath, metadata->directReads); ++numFilesOpened_; if (metadata->directReads) { ++numFilesOpenedWithDirect_; } metadata->needToClose = (metadata->fd >= 0); // works for -1 up to 4B files if (--openFilesDuringDiscovery_ == 0) { WLOG(WARNING) << "Already opened " << numFilesOpened_ << " files, will open the reminder as they are sent"; } } std::unique_lock<std::mutex> lock(mutex_); sharedFileData_.emplace_back(metadata); createIntoQueueInternal(metadata); } void DirectorySourceQueue::createIntoQueueInternal(SourceMetaData *metadata) { // TODO: currently we are treating small files(size less than blocksize) as // blocks. Also, we transfer file name in the header for all the blocks for a // large file. This can be optimized as follows - // a) if filesize < blocksize, we do not send blocksize and offset in the // header. This should be useful for tiny files(0-few hundred bytes). We will // have to use separate header format and commands for files and blocks. // b) if filesize > blocksize, we can use send filename only in the first // block and use a shorter header for subsequent blocks. Also, we can remove // block size once negotiated, since blocksize is sort of fixed. auto &fileSize = metadata->size; auto &relPath = metadata->relPath; int64_t blockSizeBytes = blockSizeMbytes_ * 1024 * 1024; bool enableBlockTransfer = blockSizeBytes > 0; if (!enableBlockTransfer) { WVLOG(2) << "Block transfer disabled for this transfer"; } // if block transfer is disabled, treating fileSize as block size. This // ensures that we create a single block auto blockSize = enableBlockTransfer ? blockSizeBytes : fileSize; int blockCount = 0; std::vector<Interval> remainingChunks; int64_t seqId; FileAllocationStatus allocationStatus; int64_t prevSeqId = 0; auto it = previouslyTransferredChunks_.find(relPath); if (it == previouslyTransferredChunks_.end()) { // No previously transferred chunks remainingChunks.emplace_back(0, fileSize); seqId = nextSeqId_++; allocationStatus = NOT_EXISTS; } else if (it->second.getFileSize() > fileSize) { // file size is greater on the receiver side remainingChunks.emplace_back(0, fileSize); seqId = nextSeqId_++; WLOG(INFO) << "File size is greater in the receiver side " << relPath << " " << fileSize << " " << it->second.getFileSize(); allocationStatus = EXISTS_TOO_LARGE; prevSeqId = it->second.getSeqId(); } else { auto &fileChunksInfo = it->second; // Some portion of the file was sent in previous transfers. Receiver sends // the list of chunks to the sender. Adding all the bytes of those chunks // should give us the number of bytes saved due to incremental download previouslySentBytes_ += fileChunksInfo.getTotalChunkSize(); remainingChunks = fileChunksInfo.getRemainingChunks(fileSize); if (remainingChunks.empty()) { WLOG(INFO) << relPath << " completely sent in previous transfer"; return; } seqId = fileChunksInfo.getSeqId(); allocationStatus = it->second.getFileSize() < fileSize ? EXISTS_TOO_SMALL : EXISTS_CORRECT_SIZE; } metadata->seqId = seqId; metadata->prevSeqId = prevSeqId; metadata->allocationStatus = allocationStatus; for (const auto &chunk : remainingChunks) { int64_t offset = chunk.start_; int64_t remainingBytes = chunk.size(); do { const int64_t size = std::min<int64_t>(remainingBytes, blockSize); std::unique_ptr<ByteSource> source = std::make_unique<FileByteSource>(metadata, size, offset); sourceQueue_.push(std::move(source)); remainingBytes -= size; offset += size; blockCount++; } while (remainingBytes > 0); totalFileSize_ += chunk.size(); } numEntries_++; numBlocks_ += blockCount; smartNotify(blockCount); } std::vector<TransferStats> &DirectorySourceQueue::getFailedSourceStats() { while (!sourceQueue_.empty()) { failedSourceStats_.emplace_back( std::move(sourceQueue_.top()->getTransferStats())); sourceQueue_.pop(); } return failedSourceStats_; } std::vector<string> &DirectorySourceQueue::getFailedDirectories() { return failedDirectories_; } bool DirectorySourceQueue::enqueueFiles(std::vector<WdtFileInfo>& fileInfo) { for (auto &info : fileInfo) { if (threadCtx_->getAbortChecker()->shouldAbort()) { WLOG(ERROR) << "Directory transfer thread aborted"; return false; } string fullPath = rootDir_ + info.fileName; if (info.fileSize < 0) { struct stat fileStat; if (stat(fullPath.c_str(), &fileStat) != 0) { WPLOG(ERROR) << "stat failed on path " << fullPath; TransferStats failedSourceStat(info.fileName); failedSourceStat.setLocalErrorCode(BYTE_SOURCE_READ_ERROR); { std::unique_lock<std::mutex> lock(mutex_); failedSourceStats_.emplace_back(std::move(failedSourceStat)); } return false; } info.fileSize = fileStat.st_size; } createIntoQueue(fullPath, info); } return true; } bool DirectorySourceQueue::finished() const { std::lock_guard<std::mutex> lock(mutex_); return initFinished_ && sourceQueue_.empty(); } int64_t DirectorySourceQueue::getCount() const { std::lock_guard<std::mutex> lock(mutex_); return numEntries_; } const PerfStatReport &DirectorySourceQueue::getPerfReport() const { return threadCtx_->getPerfReport(); } std::pair<int64_t, ErrorCode> DirectorySourceQueue::getNumBlocksAndStatus() const { std::lock_guard<std::mutex> lock(mutex_); ErrorCode status = OK; if (!failedSourceStats_.empty() || !failedDirectories_.empty()) { // this function is called by active sender threads. The only way files or // directories can fail when sender threads are active is due to read errors status = BYTE_SOURCE_READ_ERROR; } return std::make_pair(numBlocks_, status); } int64_t DirectorySourceQueue::getTotalSize() const { std::lock_guard<std::mutex> lock(mutex_); return totalFileSize_; } int64_t DirectorySourceQueue::getPreviouslySentBytes() const { return previouslySentBytes_; } bool DirectorySourceQueue::fileDiscoveryFinished() const { std::lock_guard<std::mutex> lock(mutex_); return initFinished_; } void DirectorySourceQueue::enqueueFilesToBeDeleted() { if (!deleteFiles_) { return; } if (!initFinished_ || previouslyTransferredChunks_.empty()) { // if the directory transfer has not finished yet or existing files list has // not yet been received, return return; } std::set<std::string> discoveredFiles; for (const SourceMetaData *metadata : sharedFileData_) { discoveredFiles.insert(metadata->relPath); } int64_t numFilesToBeDeleted = 0; for (auto &it : previouslyTransferredChunks_) { const std::string &fileName = it.first; if (discoveredFiles.find(fileName) != discoveredFiles.end()) { continue; } int64_t seqId = it.second.getSeqId(); // extra file on the receiver side WLOG(INFO) << "Extra file " << fileName << " seq-id " << seqId << " on the receiver side, will be deleted"; SourceMetaData *metadata = new SourceMetaData(); metadata->relPath = fileName; metadata->size = 0; // we can reuse the previous seq-id metadata->seqId = seqId; metadata->allocationStatus = TO_BE_DELETED; sharedFileData_.emplace_back(metadata); // create a byte source with size and offset equal to 0 std::unique_ptr<ByteSource> source = std::make_unique<FileByteSource>(metadata, 0, 0); sourceQueue_.push(std::move(source)); numFilesToBeDeleted++; } numEntries_ += numFilesToBeDeleted; numBlocks_ += numFilesToBeDeleted; smartNotify(numFilesToBeDeleted); } std::unique_ptr<ByteSource> DirectorySourceQueue::getNextSource( ThreadCtx *callerThreadCtx, ErrorCode &status) { std::unique_ptr<ByteSource> source; while (true) { std::unique_lock<std::mutex> lock(mutex_); numWaiters_++; // notify if someone's waiting for previous batch to finish conditionPrevTransfer_.notify_all(); while (sourceQueue_.empty() && !initFinished_) { conditionNotEmpty_.wait(lock); } numWaiters_--; if (!failedSourceStats_.empty() || !failedDirectories_.empty()) { status = ERROR; } else { status = OK; } if (sourceQueue_.empty()) { return nullptr; } // using const_cast since priority_queue returns a const reference source = std::move( const_cast<std::unique_ptr<ByteSource> &>(sourceQueue_.top())); sourceQueue_.pop(); if (sourceQueue_.empty() && initFinished_) { conditionNotEmpty_.notify_all(); } lock.unlock(); WVLOG(1) << "got next source " << rootDir_ + source->getIdentifier() << " size " << source->getSize(); // try to open the source if (source->open(callerThreadCtx) == OK) { lock.lock(); numBlocksDequeued_++; return source; } source->close(); // we need to lock again as we will be adding element to failedSourceStats // vector lock.lock(); failedSourceStats_.emplace_back(std::move(source->getTransferStats())); } } void DirectorySourceQueue::waitForPreviousTransfer( std::chrono::milliseconds progressReportInterval, std::function<int64_t()> numActiveThreadsFn) { // don't call into numActiveThreadsFn with lock held int64_t numActiveThreads = numActiveThreadsFn(); std::unique_lock<std::mutex> lock(mutex_); while (!sourceQueue_.empty() || numWaiters_ < numActiveThreads) { WLOG(INFO) << "Waiting for previous transfer." << "; Queue Size: " << sourceQueue_.size() << "; Active threads: " << numActiveThreads << "; Num Waiters: " << numWaiters_; conditionPrevTransfer_.wait_for(lock, progressReportInterval); // Release lock while calling into abort checker or numActiveThreadsFn lock.unlock(); if (threadCtx_->getAbortChecker()->shouldAbort()) { WLOG(INFO) << "Aborting directory thread..."; break; } numActiveThreads = numActiveThreadsFn(); // re-acquire lock lock.lock(); } } } }