util/FileByteSource.cpp (176 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #include <wdt/util/FileByteSource.h> #include <fcntl.h> #include <glog/logging.h> #include <sys/stat.h> #include <sys/types.h> #include <algorithm> namespace facebook { namespace wdt { int FileUtil::openForRead(ThreadCtx &threadCtx, const std::string &filename, const bool isDirectReads) { int openFlags = O_RDONLY; if (isDirectReads) { #ifdef O_DIRECT // no need to change any flags if we are using F_NOCACHE openFlags |= O_DIRECT; #endif } if (threadCtx.getOptions().close_on_exec) { #ifdef O_CLOEXEC openFlags |= O_CLOEXEC; #endif } int fd; { PerfStatCollector statCollector(threadCtx, PerfStatReport::FILE_OPEN); fd = ::open(filename.c_str(), openFlags); } if (fd >= 0) { if (isDirectReads) { #ifndef O_DIRECT #ifdef F_NOCACHE WVLOG(1) << "O_DIRECT not found, using F_NOCACHE instead " << "for " << filename; int ret = fcntl(fd, F_NOCACHE, 1); if (ret) { WPLOG(ERROR) << "Not able to set F_NOCACHE"; } #else WDT_CHECK(false) << "Direct read enabled, but both O_DIRECT and F_NOCACHE not defined " << filename; #endif #endif } } else { WPLOG(ERROR) << "Error opening file " << filename; } return fd; } FileByteSource::FileByteSource(SourceMetaData *metadata, int64_t size, int64_t offset) : metadata_(metadata), size_(size), offset_(offset), bytesRead_(0), alignedReadNeeded_(false) { transferStats_.setId(getIdentifier()); } ErrorCode FileByteSource::open(ThreadCtx *threadCtx) { if (metadata_->allocationStatus == TO_BE_DELETED) { return OK; } bytesRead_ = 0; this->close(); threadCtx_ = threadCtx; ErrorCode errCode = OK; const bool isDirectReads = metadata_->directReads; WVLOG(1) << "Reading in direct mode " << isDirectReads; if (isDirectReads) { #ifdef O_DIRECT alignedReadNeeded_ = true; #endif } if (metadata_->fd >= 0) { WVLOG(1) << "metadata already has fd, no need to open " << getIdentifier(); fd_ = metadata_->fd; } else { fd_ = FileUtil::openForRead(*threadCtx_, metadata_->fullPath, isDirectReads); if (fd_ < 0) { errCode = BYTE_SOURCE_READ_ERROR; } } transferStats_.setLocalErrorCode(errCode); return errCode; } void FileByteSource::advanceOffset(int64_t numBytes) { offset_ += numBytes; size_ -= numBytes; } char *FileByteSource::read(int64_t &size) { size = 0; if (hasError() || finished()) { return nullptr; } const Buffer *buffer = threadCtx_->getBuffer(); int64_t offsetRemainder = 0; if (alignedReadNeeded_) { offsetRemainder = (offset_ + bytesRead_) % kDiskBlockSize; } int64_t logicalRead = (int64_t)std::min<int64_t>( buffer->getSize() - offsetRemainder, size_ - bytesRead_); int64_t physicalRead = logicalRead; if (alignedReadNeeded_) { physicalRead = ((logicalRead + offsetRemainder + kDiskBlockSize - 1) / kDiskBlockSize) * kDiskBlockSize; } const int64_t seekPos = (offset_ + bytesRead_) - offsetRemainder; int numRead; { PerfStatCollector statCollector(*threadCtx_, PerfStatReport::FILE_READ); numRead = ::pread(fd_, buffer->getData(), physicalRead, seekPos); } if (numRead < 0) { WPLOG(ERROR) << "Failure while reading file " << metadata_->fullPath << " need align " << alignedReadNeeded_ << " physicalRead " << physicalRead << " offset " << offset_ << " seepPos " << seekPos << " offsetRemainder " << offsetRemainder << " bytesRead " << bytesRead_; this->close(); transferStats_.setLocalErrorCode(BYTE_SOURCE_READ_ERROR); return nullptr; } if (numRead == 0) { WLOG(ERROR) << "Unexpected EOF on " << metadata_->fullPath << " need align " << alignedReadNeeded_ << " physicalRead " << physicalRead << " offset " << offset_ << " seepPos " << seekPos << " offsetRemainder " << offsetRemainder << " bytesRead " << bytesRead_; this->close(); return nullptr; } // Can only happen in case of O_DIRECT and when // we are trying to read the last chunk of file // or we are reading in multiples of disk block size // from a sub block of the file smaller than disk block // size size = numRead - offsetRemainder; if (size > logicalRead) { WDT_CHECK(alignedReadNeeded_); size = logicalRead; } bytesRead_ += size; WVLOG(1) << "Size " << size << " need align " << alignedReadNeeded_ << " physicalRead " << physicalRead << " offset " << offset_ << " seepPos " << seekPos << " offsetRemainder " << offsetRemainder << " bytesRead " << bytesRead_; return buffer->getData() + offsetRemainder; } void FileByteSource::clearPageCache() { #ifdef HAS_POSIX_FADVISE if (metadata_->directReads) { // no need to clear page cache for direct reads return; } if (threadCtx_ == nullptr) { return; } auto &options = threadCtx_->getOptions(); if (bytesRead_ > 0 && !options.skip_fadvise) { PerfStatCollector statCollector(*threadCtx_, PerfStatReport::FADVISE); if (posix_fadvise(fd_, offset_, bytesRead_, POSIX_FADV_DONTNEED) != 0) { WPLOG(ERROR) << "posix_fadvise failed for " << getIdentifier() << " " << offset_ << " " << bytesRead_; } } #endif } void FileByteSource::close() { clearPageCache(); if (metadata_->fd >= 0) { // if the fd is not opened by this source, no need to close it WVLOG(1) << "No need to close " << getIdentifier() << ", this was not opened by FileByteSource"; } else if (fd_ >= 0) { PerfStatCollector statCollector(*threadCtx_, PerfStatReport::FILE_CLOSE); ::close(fd_); } fd_ = -1; threadCtx_ = nullptr; } } // namespace wdt } // namespace facebook