util/FileCreator.h (67 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/Protocol.h>
#include <wdt/WdtConfig.h>
#include <wdt/util/CommonImpl.h>
#include <wdt/util/TransferLogManager.h>
#include <folly/SpinLock.h>
#include <glog/logging.h>
#include <condition_variable>
#include <map>
#include <mutex>
#include <string>
#include <unordered_set>
namespace facebook {
namespace wdt {
/**
* Utility class for creating/opening files for writing while
* creating subdirs automatically and only once in case multiple
* files are created relative to the rootDir directory.
*
* Path to rootDir doesn't need to have a trailing slash
* (it's added for you if missing)
*
* This class is thread-safe. (yeah!)
*/
class FileCreator {
public:
FileCreator(const std::string &rootDir, int numThreads,
TransferLogManager &transferLogManager, bool skipWrites)
: transferLogManager_(transferLogManager), skipWrites_(skipWrites) {
CHECK(!rootDir.empty());
// For creating root directory, we are using createDirRecursively.
// Since, this function adds rootDir to the path provided to it,
// we are setting the value of rootDir after the function call.
// So, createDirRecursively uses empty rootDir for this call.
std::string rootDirPath = rootDir;
addTrailingSlash(rootDirPath);
createDirRecursively(rootDirPath, false);
resetDirCache();
rootDir_ = rootDirPath;
threadConditionVariables_ = new std::condition_variable[numThreads];
}
virtual ~FileCreator() {
delete[] threadConditionVariables_;
}
/**
* This is used to open the file in block mode. If the current thread is the
* first one to try to open the file, then it allocates space using
* openAndSetSize function. Other threads wait for the first thread to finish
* and opens the file without setting size.
*
* @param threadCtx context of the calling thread
* @param blockDetails block-details
*
* @return file descriptor in case of success, -1 otherwise
*/
int openForBlocks(ThreadCtx &threadCtx, BlockDetails const *blockDetails);
/// reset internal directory cache
void resetDirCache() {
std::lock_guard<std::mutex> lock(mutex_);
createdDirs_.clear();
}
/// clears allocation status map, called after end of each session
void clearAllocationMap() {
std::unique_lock guard(lock_);
fileStatusMap_.clear();
}
private:
/**
* Opens the file and sets its size. If the existing file size is greater than
* required size, the file is truncated using ftruncate. Space is
* allocated using posix_fallocate.
*
* @param threadCtx context of the calling thread
* @param blockDetails block-details
*
* @return file descriptor in case of success, -1 otherwise
*/
int openAndSetSize(ThreadCtx &threadCtx, BlockDetails const *blockDetails);
/**
* Create a file and open for writing, recursively create subdirs.
* Subdirs are only created once due to createdDirs_ cache, but
* if an open fails where we assumed the directory already exists
* based on cache, we try creating the dir and open again before
* failing. Will not overwrite existing files unless overwrite option
* is set.
*
* @param threadCtx context of the calling thread
* @param relPath path relative to root dir
*
* @return file descriptor or -1 on error
*/
int createFile(ThreadCtx &threadCtx, const std::string &relPath);
/**
* Open existing file
*/
int openExistingFile(ThreadCtx &threadCtx, const std::string &relPath);
/**
* sets the size of the file. If the size is greater then the
* file is truncated using ftruncate. Space is allocated using fallocate.
*
* @param threadCtx context of the calling thread
* @param fd file descriptor
* @param fileSize size of the file
*
* @return true for success, false otherwise
*/
bool setFileSize(ThreadCtx &threadCtx, int fd, int64_t fileSize);
/**
* opens the file and sets it size. Called only for the first block to request
* opening a multi-block file. Sets the allocation status in fileStatusMap_
* and notifies other waiting thread.
*
* @param threadCtx context of the calling thread
* @param blockDetails block-details
*
* @return file descriptor or -1 on error
*/
int openForFirstBlock(ThreadCtx &threadCtx, BlockDetails const *blockDetails);
/// waits for allocation of a file to finish
bool waitForAllocationFinish(int allocatingThreadIndex, int64_t seqId);
/// appends a trailing / if not already there to path
static void addTrailingSlash(std::string &path);
/**
* Create directory recursively, populating cache. Cache is only
* used if force is false (but it's still populated in any case).
*
* @param dir dir to create recursively, should end with
* '/' and not start with '/'
* @param force whether to force trying to create/skip
* checking the cache
*
* @return true iff successful
*/
bool createDirRecursively(const std::string dir, bool force = false);
/// Check whether directory has been created/is in cache
bool dirCreated(const std::string &dir) {
std::lock_guard<std::mutex> lock(mutex_);
return createdDirs_.find(dir) != createdDirs_.end();
}
/// returns full path of a file
std::string getFullPath(const std::string &relPath);
/// root directory
std::string rootDir_;
/// directories created so far, relative to root
std::unordered_set<std::string> createdDirs_;
/// protects createdDirs_
std::mutex mutex_;
const int ALLOCATED{-1};
const int FAILED{-2};
/// map from file sequence id to allocation status. There are four possible
/// allocation status. NOT STARTED(no entry in the map), ALLOCATED(-1),
/// FAILED(-2) and IN_PROGRESS(map value is the index of the allocating
/// thread)
std::map<int64_t, int> fileStatusMap_;
/// transfer log manger used by receiver
TransferLogManager &transferLogManager_;
/// mutex to coordinate waiting among threads
std::mutex allocationMutex_;
/// array of condition_variables for different threads
std::condition_variable *threadConditionVariables_;
/// lock protecting fileStatusMap_
folly::SpinLock lock_;
// Set to prevent creating files
bool skipWrites_;
};
}
}