util/DirectorySourceQueue.h (139 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <dirent.h>
#include <glog/logging.h>
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <wdt/Protocol.h>
#include <wdt/SourceQueue.h>
#include <wdt/WdtTransferRequest.h>
#include <wdt/util/FileByteSource.h>
namespace facebook {
namespace wdt {
/**
* SourceQueue that returns all the regular files under a given directory
* (recursively) as individual FileByteSource objects, sorted by decreasing
* file size.
*
* TODO: The actual building of the queue is specific to this implementation
* which may or may not make it easy to plug a different implementation
* (as shown by the current implementation of Sender.cpp)
*/
class DirectorySourceQueue : public SourceQueue {
public:
/**
* Create a DirectorySourceQueue.
* Call buildQueueSynchronously() or buildQueueAsynchronously() separately
* to actually recurse over the root directory gather files and sizes.
*
* @param options options to use
* @param rootDir root directory to recurse on
* @param abortChecker abort checker
*/
DirectorySourceQueue(const WdtOptions &options, const std::string &rootDir,
IAbortChecker const *abortChecker);
/**
* Recurse over given root directory, gather data about regular files and
* initialize internal data structures. getNextSource() will return sources
* as this call discovers them.
*
* This should only be called once. Subsequent calls will do nothing and
* return false. In case it is called from multiple threads, one of them
* will do initialization while the other calls will fail.
*
* This is synchronous in the succeeding thread - it will block until
* the directory is completely discovered. Use buildQueueAsynchronously()
* for async fetch from parallel thread.
*
* @return true iff initialization was successful and hasn't
* been done before
*/
bool buildQueueSynchronously();
/**
* Starts a new thread to build the queue @see buildQueueSynchronously()
* @return the created thread (to be joined if needed)
*/
std::thread buildQueueAsynchronously();
/// @return true iff all regular files under root dir have been consumed
bool finished() const override;
/// @return true if all the files have been discovered, false otherwise
bool fileDiscoveryFinished() const;
/**
* @param callerThreadCtx context of the calling thread
* @param status this variable is set to the status of the transfer
*
* @return next FileByteSource to consume or nullptr when finished
*/
std::unique_ptr<ByteSource> getNextSource(ThreadCtx *callerThreadCtx,
ErrorCode &status) override;
/// @return total number of files processed/enqueued
int64_t getCount() const override;
/// @return total size of files processed/enqueued
int64_t getTotalSize() const override;
/// @return total number of blocks and status of the transfer
std::pair<int64_t, ErrorCode> getNumBlocksAndStatus() const;
/// @return perf report
const PerfStatReport &getPerfReport() const;
/**
* Sets regex representing files to include for transfer
*
* @param includePattern file inclusion regex
*/
void setIncludePattern(const std::string &includePattern);
/**
* Sets regex representing files to exclude for transfer
*
* @param excludePattern file exclusion regex
*/
void setExcludePattern(const std::string &excludePattern);
/**
* Sets regex representing directories to exclude for transfer
*
* @param pruneDirPattern directory exclusion regex
*/
void setPruneDirPattern(const std::string &pruneDirPattern);
/**
* Sets the number of consumer threads for this queue. used as threshold
* between notify and notifyAll
*/
void setNumClientThreads(int64_t numClientThreads) {
numClientThreads_ = numClientThreads;
}
/**
* Sets the count and trigger for files to open during discovery
* (negative is keep opening until we run out of fd, positive is how
* many files we can still open, 0 is stop opening files)
*/
void setOpenFilesDuringDiscovery(int64_t openFilesDuringDiscovery) {
openFilesDuringDiscovery_ = openFilesDuringDiscovery;
}
/**
* If setOpenFilesDuringDiscovery is not zero, open files using direct
* mode.
*/
void setDirectReads(bool directReads) {
directReads_ = directReads;
}
/// enable extra file deletion in the receiver side
void enableFileDeletion() {
deleteFiles_ = true;
}
/**
* Stat the FileInfo input files (if their size aren't already specified) and
* insert them in the queue
*
* @param fileInfo files to transferred
*/
void setFileInfo(const std::vector<WdtFileInfo> &fileInfo);
void setFileInfoGenerator(WdtTransferRequest::FileInfoGenerator gen);
/// @param blockSizeMbytes block size in Mbytes
void setBlockSizeMbytes(int64_t blockSizeMbytes);
/// Get the file info in this directory queue
std::vector<WdtFileInfo> getFileInfo() const;
/**
* Sets whether to follow symlink or not
*
* @param followSymlinks whether to follow symlink or not
*/
void setFollowSymlinks(bool followSymlinks);
/**
* sets chunks which were sent in some previous transfer
*
* @param previouslyTransferredChunks previously sent chunk info
*/
void setPreviouslyReceivedChunks(
std::vector<FileChunksInfo> &previouslyTransferredChunks);
/**
* returns sources to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param sources sources to be returned to the queue
*/
void returnToQueue(std::vector<std::unique_ptr<ByteSource>> &sources);
/**
* returns a source to the queue, checks for fail/retries, doesn't increment
* numentries
*
* @param source source to be returned to the queue
*/
void returnToQueue(std::unique_ptr<ByteSource> &source);
/**
* Returns list of files which were not transferred. It empties the queue and
* adds queue entries to the failed file list. This function should be called
* after all the sending threads have finished execution
*
* @return stats for failed sources
*/
std::vector<TransferStats> &getFailedSourceStats();
/// @return returns list of directories which could not be opened
std::vector<std::string> &getFailedDirectories();
/// @return number of bytes previously sent
int64_t getPreviouslySentBytes() const;
~DirectorySourceQueue() override;
/// @return discovered files metadata
std::vector<SourceMetaData *> &getDiscoveredFilesMetaData();
/// Returns the time it took to traverse the directory tree
double getDirectoryTime() const {
return directoryTime_;
}
/**
* Allows to change the root directory, must not be empty, trailing
* slash is automatically added if missing. Can be relative.
* if follow symlink is set the directory will be resolved as absolute
* path.
* @return true if successful, false on error (logged)
*/
bool setRootDir(const std::string &newRootDir);
/**
* Allows the caller to block until all the previous transfers have
* finished, before invoking fileInfoGenerator_ to get the next batch.
* NOTE: This uses numActiveThreadsFn() to get the number of clients pulling
* from the queue and the size of queue to determine if transfers have
* finished.
*
* @param progressReportInterval report progress every
* progressReportInterval milliseconds.
* @param numActiveThreadsFn Func to get number of active threads
*/
void waitForPreviousTransfer(std::chrono::milliseconds progressReportInterval,
std::function<int64_t()> numActiveThreadsFn);
private:
/**
* Resolves a symlink.
*
* @return realpath or empty string on error (logged)
*/
std::string resolvePath(const std::string &path);
/**
* Traverse rootDir_ to gather files and sizes to enqueue
*
* @return true on success, false on error
*/
bool explore();
/**
* Stat the input files and populate queue
* @return true on success, false on error
*/
bool enqueueFiles(std::vector<WdtFileInfo>& fileInfo);
/**
* initial creation from either explore or enqueue files, uses
* createIntoQueueInternal to create blocks
*
* @param fullPath full path of the file to be added
* @param fileInfo Information about file
*/
void createIntoQueue(const std::string &fullPath, WdtFileInfo &fileInfo);
/**
* initial creation from either explore or enqueue files - always increment
* numentries. Lock must be held before calling this.
*
* @param metadata file meta-data
*/
void createIntoQueueInternal(SourceMetaData *metadata);
/**
* when adding multiple files, we have the option of using notify_one multiple
* times or notify_all once. Depending on number of added sources, this
* function uses either notify_one or notify_all
*
* @param addedSource number of sources added
*/
void smartNotify(int32_t addedSource);
/// Removes all elements from the source queue
void clearSourceQueue();
/// if file deletion is enabled, extra files to be deleted are enqueued. This
/// method should be called while holding the lock
void enqueueFilesToBeDeleted();
std::unique_ptr<ThreadCtx> threadCtx_{nullptr};
/// root directory to recurse on if fileInfo_ is empty
std::string rootDir_;
/// regex representing directories to prune
std::string pruneDirPattern_;
/// regex representing files to include
std::string includePattern_;
/// regex representing files to exclude
std::string excludePattern_;
/// Block size in mb
int64_t blockSizeMbytes_{0};
/// List of files to enqueue instead of recursing over rootDir_.
std::vector<WdtFileInfo> fileInfo_;
/// A generator function to invoke to get more files to send
WdtTransferRequest::FileInfoGenerator fileInfoGenerator_;
/// protects
/// initCalled_/initFinished_/sourceQueue_/failedSourceStats_/numWaiters_
mutable std::mutex mutex_;
/// condition variable indicating sourceQueue_ is not empty
mutable std::condition_variable conditionNotEmpty_;
/// condition variable indicating previous batch transfer has finished i.e.
/// queue is empty and all client threads are waiting.
mutable std::condition_variable conditionPrevTransfer_;
/// Indicates whether init() has been called to prevent multiple calls
bool initCalled_{false};
/// Indicates whether call to init() has finished
bool initFinished_{false};
struct SourceComparator {
bool operator()(const std::unique_ptr<ByteSource> &source1,
const std::unique_ptr<ByteSource> &source2) {
bool toBeDeleted1 =
(source1->getMetaData().allocationStatus == TO_BE_DELETED);
bool toBeDeleted2 =
(source2->getMetaData().allocationStatus == TO_BE_DELETED);
if (toBeDeleted1 != toBeDeleted2) {
// always send files to be deleted first
return toBeDeleted2;
}
auto retryCount1 = source1->getTransferStats().getFailedAttempts();
auto retryCount2 = source2->getTransferStats().getFailedAttempts();
if (retryCount1 != retryCount2) {
return retryCount1 > retryCount2;
}
if (source1->getSize() != source2->getSize()) {
return source1->getSize() < source2->getSize();
}
if (source1->getOffset() != source2->getOffset()) {
return source1->getOffset() > source2->getOffset();
}
return source1->getIdentifier() > source2->getIdentifier();
}
};
/**
* priority queue of sources. Sources are first ordered by increasing
* failedAttempts, then by decreasing size. If sizes are equal(always for
* blocks), sources are ordered by offset. This way, we ensure that all the
* threads in the receiver side are not writing to the same file at the same
* time.
*/
std::priority_queue<std::unique_ptr<ByteSource>,
std::vector<std::unique_ptr<ByteSource>>,
SourceComparator>
sourceQueue_;
/**
* number of threads waiting on the queue
*/
int64_t numWaiters_{0};
/// Transfer stats for sources which are not transferred
std::vector<TransferStats> failedSourceStats_;
/// directories which could not be opened
std::vector<std::string> failedDirectories_;
/// Total number of files that have passed through the queue
int64_t numEntries_{0};
/// Seq-id of the next file to be inserted into the queue
/// first valid seq is 1 so we can use 0 as unintilized/invalid in protocol.h
int64_t nextSeqId_{1};
/// total number of blocks that have passed through the queue. Even when
/// blocks are actually disabled, our code internally treats files like single
/// blocks. So, numBlocks_ >= numFiles_.
int64_t numBlocks_{0};
/// Total size of entries/files that have passed through the queue
int64_t totalFileSize_{0};
/// Number of blocks dequeued
int64_t numBlocksDequeued_{0};
/// Whether to follow symlinks or not
bool followSymlinks_{false};
/// shared file data. This are used during transfer to add blocks
/// contribution
std::vector<SourceMetaData *> sharedFileData_;
/// A map from relative file name to previously received chunks
std::unordered_map<std::string, FileChunksInfo> previouslyTransferredChunks_;
/// Stores the time difference between the start and the end of the
/// traversal of directory
double directoryTime_{0};
/// Number of bytes previously sent
int64_t previouslySentBytes_{0};
/**
* Count and trigger of files to open (negative is keep opening until we run
* out of fd, positive is how many files we can still open, 0 is stop opening
* files).
* Sender only (Receiver download resumption directory discovery should not
* open files).
*/
int32_t openFilesDuringDiscovery_{0};
/// Should the WdtFileInfo created during discovery have direct read mode set
bool directReads_{false};
// Number of files opened
int64_t numFilesOpened_{0};
// Number of files opened with odirect
int64_t numFilesOpenedWithDirect_{0};
// Number of consumer threads (to tell between notify/notifyall)
int64_t numClientThreads_{1};
// Should we explore or use fileInfo
bool exploreDirectory_{true};
/// delete extra files in the receiver side
bool deleteFiles_{false};
};
}
}