util/FileWriter.cpp (173 lines of code) (raw):
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#include <wdt/util/FileWriter.h>
#include <wdt/util/CommonImpl.h>
#include <fcntl.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/types.h>
namespace facebook {
namespace wdt {
FileWriter::~FileWriter() {
// Make sure that the file is closed but this should be a no-op as the
// caller should always call sync() and close() manually to check the error
// code.
if (!isClosed()) {
WLOG(ERROR ) << "File " << blockDetails_->fileName
<< " was not closed and needed to be closed in the dtor";
close();
}
}
ErrorCode FileWriter::open() {
if (threadCtx_.getOptions().skip_writes) {
return OK;
}
// TODO: consider a working optimization for small files
fd_ = fileCreator_->openForBlocks(threadCtx_, blockDetails_);
if (blockDetails_->allocationStatus == TO_BE_DELETED) {
WDT_CHECK_EQ(-1, fd_);
return OK;
}
if (fd_ >= 0 && blockDetails_->offset > 0) {
int64_t ret;
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_SEEK);
ret = lseek(fd_, blockDetails_->offset, SEEK_SET);
}
if (ret < 0) {
WPLOG(ERROR) << "Unable to seek to " << blockDetails_->offset << " for "
<< blockDetails_->fileName;
close();
return FILE_WRITE_ERROR;
}
}
if (fd_ == -1) {
WLOG(ERROR) << "File open/seek failed for " << blockDetails_->fileName;
return FILE_WRITE_ERROR;
}
return OK;
}
ErrorCode FileWriter::sync() {
if (fd_ < 0) {
// File was either never opened or already closed
return OK;
}
const auto &options = threadCtx_.getOptions();
if (options.fsync || options.isLogBasedResumption()) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FSYNC_STATS);
if (::fsync(fd_) < 0) {
WPLOG(ERROR) << "Unable to fsync() fd " << fd_;
return FILE_WRITE_ERROR;
}
}
#ifdef HAS_POSIX_FADVISE
if (!options.skip_fadvise) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FADVISE);
if (posix_fadvise(fd_, blockDetails_->offset, blockDetails_->dataSize,
POSIX_FADV_DONTNEED) != 0) {
WPLOG(ERROR) << "posix_fadvise failed for " << blockDetails_->fileName
<< " " << blockDetails_->offset << " "
<< blockDetails_->dataSize;
return FILE_WRITE_ERROR;
}
}
#endif
return OK;
}
ErrorCode FileWriter::close() {
if (fd_ >= 0) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_CLOSE);
if (::close(fd_) != 0) {
WPLOG(ERROR) << "Unable to close fd " << fd_;
fd_ = -1;
return FILE_WRITE_ERROR;
}
fd_ = -1;
}
return OK;
}
bool FileWriter::isClosed() {
return fd_ < 0;
}
ErrorCode FileWriter::write(char *buf, int64_t size) {
WDT_CHECK_NE(TO_BE_DELETED, blockDetails_->allocationStatus);
auto &options = threadCtx_.getOptions();
if (!options.skip_writes) {
int64_t count = 0;
while (count < size) {
int64_t written;
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FILE_WRITE);
written = ::write(fd_, buf + count, size - count);
}
if (written == -1) {
if (errno == EINTR) {
WVLOG(1) << "Disk write interrupted, retrying "
<< blockDetails_->fileName;
continue;
}
WPLOG(ERROR) << "File write failed for " << blockDetails_->fileName
<< "fd : " << fd_ << " " << written << " " << count << " "
<< size;
return FILE_WRITE_ERROR;
}
count += written;
}
WVLOG(1) << "Successfully written " << count << " bytes to fd " << fd_
<< " for file " << blockDetails_->fileName;
const bool finished = ((totalWritten_ + size) == blockDetails_->dataSize);
if (!syncFileRange(count, finished /*forced*/)) {
return FILE_WRITE_ERROR;
}
}
totalWritten_ += size;
return OK;
}
bool FileWriter::syncFileRange(int64_t written, bool forced) {
#ifdef HAS_SYNC_FILE_RANGE
const WdtOptions &options = threadCtx_.getOptions();
if (options.disk_sync_interval_mb < 0) {
return true;
}
const int64_t syncIntervalBytes = options.disk_sync_interval_mb * 1024 * 1024;
writtenSinceLastSync_ += written;
if (writtenSinceLastSync_ == 0) {
// no need to sync
WVLOG(1) << "skipping syncFileRange for " << blockDetails_->fileName
<< ". Data written " << written
<< " sync forced = " << std::boolalpha << forced;
return true;
}
if (forced || writtenSinceLastSync_ > syncIntervalBytes) {
// sync_file_range with flag SYNC_FILE_RANGE_WRITE is an asynchronous
// operation. So, this is not that costly. Source :
// http://yoshinorimatsunobu.blogspot.com/2014/03/how-syncfilerange-really-works.html
int status;
{
PerfStatCollector statCollector(threadCtx_,
PerfStatReport::SYNC_FILE_RANGE);
status = sync_file_range(fd_, nextSyncOffset_, writtenSinceLastSync_,
SYNC_FILE_RANGE_WRITE);
}
if (status != 0) {
WPLOG(ERROR) << "sync_file_range() failed for " << blockDetails_->fileName
<< "fd " << fd_;
return false;
}
#ifdef HAS_POSIX_FADVISE
int ret;
if (!options.skip_fadvise) {
{
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FADVISE);
ret = posix_fadvise(fd_, nextSyncOffset_, writtenSinceLastSync_,
POSIX_FADV_DONTNEED);
}
if (ret != 0) {
WPLOG(ERROR) << "posix_fadvise failed for "
<< blockDetails_->fileName
<< " " << nextSyncOffset_ << " "
<< writtenSinceLastSync_;
return false;
}
}
#endif
WVLOG(1) << "file range [" << nextSyncOffset_ << " "
<< writtenSinceLastSync_ << "] synced for file "
<< blockDetails_->fileName;
nextSyncOffset_ += writtenSinceLastSync_;
writtenSinceLastSync_ = 0;
}
#endif
return true;
}
}
}