util/FileCreator.cpp (302 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/util/FileCreator.h>
#include <wdt/ErrorCodes.h>
#include <fcntl.h>
#include <folly/Conv.h>
#include <sys/stat.h>
#include <sys/types.h>
namespace facebook {
namespace wdt {
bool FileCreator::setFileSize(ThreadCtx &threadCtx, int fd, int64_t fileSize) {
struct stat fileStat;
if (fstat(fd, &fileStat) != 0) {
WPLOG(ERROR) << "fstat() failed for " << fd;
return false;
}
if (fileStat.st_size > fileSize) {
// existing file is larger than required
int64_t sizeToTruncate =
(threadCtx.getOptions().shouldPreallocateFiles() ? fileSize : 0);
if (ftruncate(fd, sizeToTruncate) != 0) {
WPLOG(ERROR) << "ftruncate() failed for " << fd << " " << sizeToTruncate;
return false;
}
}
if (fileSize == 0) {
return true;
}
if (!threadCtx.getOptions().shouldPreallocateFiles()) {
// pre-allocation is disabled
return true;
}
#ifdef HAS_POSIX_FALLOCATE
int status = posix_fallocate(fd, 0, fileSize);
if (status != 0) {
WLOG(ERROR) << "fallocate() failed " << strerrorStr(status);
return false;
}
return true;
#else
WDT_CHECK(false) << "Should never reach here";
#endif
}
int FileCreator::openAndSetSize(ThreadCtx &threadCtx,
BlockDetails const *blockDetails) {
int fd;
const bool doCreate = (blockDetails->allocationStatus == NOT_EXISTS);
const bool isTooLarge = (blockDetails->allocationStatus == EXISTS_TOO_LARGE);
if (doCreate) {
fd = createFile(threadCtx, blockDetails->fileName);
} else {
fd = openExistingFile(threadCtx, blockDetails->fileName);
}
if (fd < 0) {
return -1;
}
if (blockDetails->allocationStatus == EXISTS_CORRECT_SIZE) {
return fd;
}
if (!setFileSize(threadCtx, fd, blockDetails->fileSize)) {
close(fd);
return -1;
}
if (threadCtx.getOptions().isLogBasedResumption()) {
if (isTooLarge) {
WLOG(WARNING) << "File size smaller in the sender side "
<< blockDetails->fileName
<< ", marking previous transferred chunks as invalid";
transferLogManager_.addFileInvalidationEntry(blockDetails->prevSeqId);
}
if (isTooLarge || doCreate) {
transferLogManager_.addFileCreationEntry(
blockDetails->fileName, blockDetails->seqId, blockDetails->fileSize);
} else {
WDT_CHECK_EQ(EXISTS_TOO_SMALL, blockDetails->allocationStatus);
transferLogManager_.addFileResizeEntry(blockDetails->seqId,
blockDetails->fileSize);
}
}
return fd;
}
int FileCreator::openForFirstBlock(ThreadCtx &threadCtx,
BlockDetails const *blockDetails) {
int fd = openAndSetSize(threadCtx, blockDetails);
{
std::unique_lock guard(lock_);
auto it = fileStatusMap_.find(blockDetails->seqId);
WDT_CHECK(it != fileStatusMap_.end());
it->second = fd >= 0 ? ALLOCATED : FAILED;
}
std::unique_lock<std::mutex> waitLock(allocationMutex_);
threadConditionVariables_[threadCtx.getThreadIndex()].notify_all();
return fd;
}
bool FileCreator::waitForAllocationFinish(int allocatingThreadIndex,
int64_t seqId) {
std::unique_lock<std::mutex> waitLock(allocationMutex_);
while (true) {
{
std::unique_lock guard(lock_);
auto it = fileStatusMap_.find(seqId);
WDT_CHECK(it != fileStatusMap_.end());
if (it->second == ALLOCATED) {
return true;
}
if (it->second == FAILED) {
return false;
}
}
threadConditionVariables_[allocatingThreadIndex].wait(waitLock);
}
}
int FileCreator::openForBlocks(ThreadCtx &threadCtx,
BlockDetails const *blockDetails) {
if (blockDetails->allocationStatus == TO_BE_DELETED) {
const std::string path = getFullPath(blockDetails->fileName);
int status;
{
PerfStatCollector statCollector(threadCtx, PerfStatReport::UNLINK);
status = ::unlink(path.c_str());
}
if (status != 0) {
WPLOG(ERROR) << "Failed to delete file " << path;
} else {
WLOG(INFO) << "Successfully deleted file " << path;
}
return -1;
}
lock_.lock();
auto it = fileStatusMap_.find(blockDetails->seqId);
if (blockDetails->allocationStatus == EXISTS_CORRECT_SIZE &&
it == fileStatusMap_.end()) {
it =
fileStatusMap_
.insert(std::make_pair(blockDetails->seqId, FileCreator::ALLOCATED))
.first;
}
if (it == fileStatusMap_.end()) {
// allocation has not started for this file
fileStatusMap_.insert(
std::make_pair(blockDetails->seqId, threadCtx.getThreadIndex()));
lock_.unlock();
return openForFirstBlock(threadCtx, blockDetails);
}
auto statusOrThreadIdx = it->second;
lock_.unlock();
if (statusOrThreadIdx == FAILED) {
// allocation failed previously
return -1;
}
if (statusOrThreadIdx != ALLOCATED) {
// allocation in progress
if (!waitForAllocationFinish(statusOrThreadIdx, blockDetails->seqId)) {
return -1;
}
}
return openExistingFile(threadCtx, blockDetails->fileName);
}
using std::string;
int FileCreator::openExistingFile(ThreadCtx &threadCtx,
const string &relPathStr) {
// This should have been validated earlier and errored out
// instead of crashing here
WDT_CHECK(!relPathStr.empty());
WDT_CHECK(relPathStr[0] != '/');
WDT_CHECK(relPathStr.back() != '/');
const string path = getFullPath(relPathStr);
int openFlags = O_WRONLY;
if (threadCtx.getOptions().close_on_exec) {
#ifdef O_CLOEXEC
openFlags |= O_CLOEXEC;
#endif
}
int res;
{
PerfStatCollector statCollector(threadCtx, PerfStatReport::FILE_OPEN);
res = open(path.c_str(), openFlags, 0644);
}
if (res < 0) {
WPLOG(ERROR) << "failed opening file " << path;
return -1;
}
WVLOG(1) << "successfully opened file " << path;
return res;
}
int FileCreator::createFile(ThreadCtx &threadCtx, const string &relPathStr) {
CHECK(!relPathStr.empty());
CHECK(relPathStr[0] != '/');
CHECK(relPathStr.back() != '/');
// Skip writes is turned on. We shouldn't be creating files
if (skipWrites_) {
return -1;
}
const string path = getFullPath(relPathStr);
int p = relPathStr.size();
while (p && relPathStr[p - 1] != '/') {
--p;
}
std::string dir;
if (p) {
dir.assign(relPathStr.data(), p);
bool dirSuccess1;
{
PerfStatCollector statCollector(threadCtx,
PerfStatReport::DIRECTORY_CREATE);
dirSuccess1 = createDirRecursively(dir);
}
if (!dirSuccess1) {
// retry with force
WLOG(ERROR) << "failed to create dir " << dir << " recursively, "
<< "trying to force directory creation";
bool dirSuccess2;
{
PerfStatCollector statCollector(threadCtx,
PerfStatReport::DIRECTORY_CREATE);
dirSuccess2 = createDirRecursively(dir, true /* force */);
}
if (!dirSuccess2) {
WLOG(ERROR) << "failed to create dir " << dir << " recursively";
return -1;
}
}
}
int openFlags = O_CREAT | O_WRONLY;
if (threadCtx.getOptions().close_on_exec) {
#ifdef O_CLOEXEC
openFlags |= O_CLOEXEC;
#endif
}
// When doing download resumption we sometime open files that do already
// exist and we need to overwrite them anyway (files which have been
// discarded from the log for some reason)
if (threadCtx.getOptions().overwrite ||
threadCtx.getOptions().enable_download_resumption) {
// Make sure file size resumption will not get messed up if we
// expect to create this file
openFlags |= O_TRUNC;
} else {
// Make sure open will fail if we don't allow overwriting and
// the file happens to already exist
openFlags |= O_EXCL;
}
int res;
{
PerfStatCollector statCollector(threadCtx, PerfStatReport::FILE_OPEN);
res = open(path.c_str(), openFlags, 0644);
}
if (res < 0) {
if (dir.empty()) {
WPLOG(ERROR) << "failed creating file " << path;
return -1;
}
WPLOG(ERROR) << "failed creating file " << path << ", trying to "
<< "force directory creation";
bool dirSuccess;
{
PerfStatCollector statCollector(threadCtx,
PerfStatReport::DIRECTORY_CREATE);
dirSuccess = createDirRecursively(dir, true /* force */);
}
if (!dirSuccess) {
WLOG(ERROR) << "failed to create dir " << dir << " recursively";
return -1;
}
{
PerfStatCollector statCollector(threadCtx, PerfStatReport::FILE_OPEN);
res = open(path.c_str(), openFlags, 0644);
}
if (res < 0) {
WPLOG(ERROR) << "failed creating file " << path;
return -1;
}
}
WVLOG(1) << "successfully created file " << path;
return res;
}
bool FileCreator::createDirRecursively(const std::string dir, bool force) {
// Skip writes is turned on. We shouldn't be creating files
if (skipWrites_) {
return false;
}
if (!force && dirCreated(dir)) {
return true;
}
WDT_CHECK(dir.back() == '/');
int64_t lastIndex = dir.size() - 1;
while (lastIndex > 0 && dir[lastIndex - 1] != '/') {
lastIndex--;
}
if (lastIndex > 0) {
if (!createDirRecursively(dir.substr(0, lastIndex), force)) {
return false;
}
}
std::string fullDirPath = getFullPath(dir);
int code = mkdir(fullDirPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if (code != 0 && errno != EEXIST && errno != EISDIR) {
WPLOG(ERROR) << "failed to make directory " << fullDirPath;
return false;
} else if (code != 0) {
WLOG(INFO) << "dir already exists " << fullDirPath;
} else {
WLOG(INFO) << "made dir " << fullDirPath;
}
{
std::lock_guard<std::mutex> lock(mutex_);
createdDirs_.insert(dir);
}
return true;
}
std::string FileCreator::getFullPath(const std::string &relPath) {
return (rootDir_ + relPath);
}
/* static */
void FileCreator::addTrailingSlash(string &path) {
if (path.back() != '/') {
path.push_back('/');
WVLOG(1) << "Added missing trailing / to " << path;
}
}
} // namespace wdt
} // namespace facebook