cachelib/navy/common/Device.cpp (301 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cachelib/navy/common/Device.h" #include <folly/File.h> #include <folly/Format.h> #include <cstring> #include <numeric> namespace facebook { namespace cachelib { namespace navy { namespace { using IOOperation = std::function<ssize_t(int fd, void* buf, size_t count, off_t offset)>; // Device on Unix file descriptor class FileDevice final : public Device { public: FileDevice(folly::File file, uint64_t size, uint32_t ioAlignSize, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t maxDeviceWriteSize) : Device{size, std::move(encryptor), ioAlignSize, maxDeviceWriteSize}, file_{std::move(file)} {} FileDevice(const FileDevice&) = delete; FileDevice& operator=(const FileDevice&) = delete; ~FileDevice() override {} private: bool writeImpl(uint64_t offset, uint32_t size, const void* value) override { ssize_t bytesWritten = ::pwrite(file_.fd(), value, size, offset); if (bytesWritten != size) { reportIOError("write", offset, size, bytesWritten); } return bytesWritten == size; } bool readImpl(uint64_t offset, uint32_t size, void* value) override { ssize_t bytesRead = ::pread(file_.fd(), value, size, offset); if (bytesRead != size) { reportIOError("read", offset, size, bytesRead); } return bytesRead == size; } void flushImpl() override { ::fsync(file_.fd()); } void reportIOError(const char* opName, uint64_t offset, uint32_t size, ssize_t ioRet) { XLOG_EVERY_N_THREAD( ERR, 1000, folly::sformat("IO error: {} offset={} size={} ret={} errno={} ({})", opName, offset, size, ioRet, errno, std::strerror(errno))); } const folly::File file_{}; }; // RAID0 device spanning multiple files class RAID0Device final : public Device { public: RAID0Device(std::vector<folly::File> fvec, uint64_t fdSize, uint32_t ioAlignSize, uint32_t stripeSize, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t maxDeviceWriteSize) : Device{fdSize * fvec.size(), std::move(encryptor), ioAlignSize, maxDeviceWriteSize}, fvec_{std::move(fvec)}, stripeSize_(stripeSize) { XDCHECK_GT(ioAlignSize, 0u); XDCHECK_GT(stripeSize_, 0u); XDCHECK_GE(stripeSize_, ioAlignSize); XDCHECK_EQ(0u, stripeSize_ % 2) << stripeSize_; XDCHECK_EQ(0u, stripeSize_ % ioAlignSize) << stripeSize_ << ", " << ioAlignSize; if (fdSize % stripeSize != 0) { throw std::invalid_argument( folly::sformat("Invalid size because individual device size: {} is " "not aligned to stripe size: {}", fdSize, stripeSize)); } } RAID0Device(const RAID0Device&) = delete; RAID0Device& operator=(const RAID0Device&) = delete; ~RAID0Device() override {} private: bool writeImpl(uint64_t offset, uint32_t size, const void* value) override { IOOperation io = ::pwrite; return doIO(offset, size, const_cast<void*>(value), "RAID0 WRITE", io); } bool readImpl(uint64_t offset, uint32_t size, void* value) override { IOOperation io = ::pread; return doIO(offset, size, value, "RAID0 READ", io); } void flushImpl() override { for (const auto& f : fvec_) { ::fsync(f.fd()); } } bool doIO(uint64_t offset, uint32_t size, void* value, const char* opName, IOOperation& io) { uint8_t* buf = reinterpret_cast<uint8_t*>(value); while (size > 0) { uint64_t stripe = offset / stripeSize_; uint32_t fdIdx = stripe % fvec_.size(); uint64_t stripeStartOffset = (stripe / fvec_.size()) * stripeSize_; uint32_t ioOffsetInStripe = offset % stripeSize_; uint32_t allowedIOSize = std::min(size, stripeSize_ - ioOffsetInStripe); ssize_t retSize = io(fvec_[fdIdx].fd(), buf, allowedIOSize, stripeStartOffset + ioOffsetInStripe); if (retSize != allowedIOSize) { XLOG_EVERY_N_THREAD( ERR, 1000, folly::sformat( "IO error: {} logicalOffset={} logicalIOSize={} stripeSize={} " "stripe={} offsetInStripe={} stripeIOSize={} ret={} errno={} " "({})", opName, offset, size, stripeSize_, stripe, ioOffsetInStripe, allowedIOSize, retSize, errno, std::strerror(errno))); return false; } size -= allowedIOSize; offset += allowedIOSize; buf += allowedIOSize; } return true; } const std::vector<folly::File> fvec_{}; const uint32_t stripeSize_{}; }; // Device on memory buffer class MemoryDevice final : public Device { public: explicit MemoryDevice(uint64_t size, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t ioAlignSize) : Device{size, std::move(encryptor), ioAlignSize, 0 /* max device write size */}, buffer_{std::make_unique<uint8_t[]>(size)} {} MemoryDevice(const MemoryDevice&) = delete; MemoryDevice& operator=(const MemoryDevice&) = delete; ~MemoryDevice() override = default; private: bool writeImpl(uint64_t offset, uint32_t size, const void* value) noexcept override { XDCHECK_LE(offset + size, getSize()); std::memcpy(buffer_.get() + offset, value, size); return true; } bool readImpl(uint64_t offset, uint32_t size, void* value) override { XDCHECK_LE(offset + size, getSize()); std::memcpy(value, buffer_.get() + offset, size); return true; } void flushImpl() override { // Noop } std::unique_ptr<uint8_t[]> buffer_; }; } // namespace bool Device::write(uint64_t offset, Buffer buffer) { const auto size = buffer.size(); XDCHECK_LE(offset + buffer.size(), size_); uint8_t* data = reinterpret_cast<uint8_t*>(buffer.data()); XDCHECK_EQ(reinterpret_cast<uint64_t>(data) % ioAlignmentSize_, 0ul); if (encryptor_) { XCHECK_EQ(offset % encryptor_->encryptionBlockSize(), 0ul); auto res = encryptor_->encrypt(folly::MutableByteRange{data, size}, offset); if (!res) { encryptionErrors_.inc(); return false; } } auto remainingSize = size; auto maxWriteSize = (maxWriteSize_ == 0) ? remainingSize : maxWriteSize_; bool result = true; while (remainingSize > 0) { auto writeSize = std::min<size_t>(maxWriteSize, remainingSize); XDCHECK_EQ(offset % ioAlignmentSize_, 0ul); XDCHECK_EQ(writeSize % ioAlignmentSize_, 0ul); auto timeBegin = getSteadyClock(); result = writeImpl(offset, writeSize, data); writeLatencyEstimator_.trackValue( toMicros((getSteadyClock() - timeBegin)).count()); if (result) { bytesWritten_.add(writeSize); } else { // One part of the write failed so we abort the rest break; } offset += writeSize; data += writeSize; remainingSize -= writeSize; } if (!result) { writeIOErrors_.inc(); } return result; } // reads size number of bytes from the device from the offset into value. // Both offset and size are expected to be aligned for device IO operations. // If successful and encryptor_ is defined, size bytes from // validDataOffsetInValue offset in value are decrypted. // // returns true if successful, false otherwise. bool Device::readInternal(uint64_t offset, uint32_t size, void* value) { XDCHECK_EQ(reinterpret_cast<uint64_t>(value) % ioAlignmentSize_, 0ul); XDCHECK_EQ(offset % ioAlignmentSize_, 0ul); XDCHECK_EQ(size % ioAlignmentSize_, 0ul); XDCHECK_LE(offset + size, size_); auto timeBegin = getSteadyClock(); bool result = readImpl(offset, size, value); readLatencyEstimator_.trackValue( toMicros(getSteadyClock() - timeBegin).count()); if (!result) { readIOErrors_.inc(); return result; } bytesRead_.add(size); if (encryptor_) { XCHECK_EQ(offset % encryptor_->encryptionBlockSize(), 0ul); auto res = encryptor_->decrypt( folly::MutableByteRange{reinterpret_cast<uint8_t*>(value), size}, offset); if (!res) { decryptionErrors_.inc(); return false; } } return true; } // This API reads size bytes from the Device from offset into a Buffer and // returns the Buffer. If offset and size are not aligned to device's // ioAlignmentSize_, IO aligned offset and IO aligned size are determined // and passed to device read. Upon successful read from the device, the // buffer is adjusted to return the intended data by trimming the data in // the front and back. // An empty buffer is returned in case of error and the caller must check // the buffer size returned with size passed in to check for errors. Buffer Device::read(uint64_t offset, uint32_t size) { XDCHECK_LE(offset + size, size_); uint64_t readOffset = offset & ~(static_cast<uint64_t>(ioAlignmentSize_) - 1ul); uint64_t readPrefixSize = offset & (static_cast<uint64_t>(ioAlignmentSize_) - 1ul); auto readSize = getIOAlignedSize(readPrefixSize + size); auto buffer = makeIOBuffer(readSize); bool result = readInternal(readOffset, readSize, buffer.data()); if (!result) { return Buffer{}; } buffer.trimStart(readPrefixSize); buffer.shrink(size); return buffer; } // This API reads size bytes from the Device from the offset into value. // Both offset and size are expected to be IO aligned. bool Device::read(uint64_t offset, uint32_t size, void* value) { return readInternal(offset, size, value); } void Device::getCounters(const CounterVisitor& visitor) const { visitor("navy_device_bytes_written", getBytesWritten()); visitor("navy_device_bytes_read", getBytesRead()); readLatencyEstimator_.visitQuantileEstimator(visitor, "navy_device_read_latency_us"); writeLatencyEstimator_.visitQuantileEstimator(visitor, "navy_device_write_latency_us"); visitor("navy_device_read_errors", readIOErrors_.get()); visitor("navy_device_write_errors", writeIOErrors_.get()); visitor("navy_device_encryption_errors", encryptionErrors_.get()); visitor("navy_device_decryption_errors", decryptionErrors_.get()); } std::unique_ptr<Device> createFileDevice( folly::File file, uint64_t size, std::shared_ptr<DeviceEncryptor> encryptor) { return std::make_unique<FileDevice>(std::move(file), size, 0, std::move(encryptor), 0 /* max device write size */); } std::unique_ptr<Device> createDirectIoFileDevice( folly::File file, uint64_t size, uint32_t ioAlignSize, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t maxDeviceWriteSize) { XDCHECK(folly::isPowTwo(ioAlignSize)); return std::make_unique<FileDevice>(std::move(file), size, ioAlignSize, std::move(encryptor), maxDeviceWriteSize); } std::unique_ptr<Device> createDirectIoRAID0Device( std::vector<folly::File> fvec, uint64_t size, // size of each device in the RAID uint32_t ioAlignSize, uint32_t stripeSize, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t maxDeviceWriteSize) { XDCHECK(folly::isPowTwo(ioAlignSize)); return std::make_unique<RAID0Device>(std::move(fvec), size, ioAlignSize, stripeSize, std::move(encryptor), maxDeviceWriteSize); } std::unique_ptr<Device> createMemoryDevice( uint64_t size, std::shared_ptr<DeviceEncryptor> encryptor, uint32_t ioAlignSize) { return std::make_unique<MemoryDevice>(size, std::move(encryptor), ioAlignSize); } } // namespace navy } // namespace cachelib } // namespace facebook