cachelib/navy/serialization/RecordIO.cpp (205 lines of code) (raw):
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "cachelib/navy/serialization/RecordIO.h"
#include <folly/Format.h>
#include <folly/Range.h>
#include <folly/io/RecordIO.h>
using namespace folly::recordio_helpers;
namespace facebook {
namespace cachelib {
namespace navy {
constexpr uint32_t kMetadataHeaderFileId = 1;
namespace {
class FileRecordWriter final : public RecordWriter {
public:
explicit FileRecordWriter(int fd) : writer_{folly::File(fd)} {}
~FileRecordWriter() override = default;
void writeRecord(std::unique_ptr<folly::IOBuf> buf) override {
writer_.write(std::move(buf));
}
bool invalidate() override { return false; }
private:
folly::RecordIOWriter writer_;
};
class FileRecordReader final : public RecordReader {
public:
explicit FileRecordReader(int fd)
: reader_{folly::File(fd)}, curr_{reader_.seek(0)} {}
~FileRecordReader() override = default;
std::unique_ptr<folly::IOBuf> readRecord() override {
auto buf = folly::IOBuf::copyBuffer(curr_->first);
++curr_;
return buf;
}
bool isEnd() const override { return curr_ == reader_.end(); }
private:
folly::RecordIOReader reader_;
folly::RecordIOReader::Iterator curr_;
};
class DeviceMetaDataWriter final : public RecordWriter {
public:
explicit DeviceMetaDataWriter(Device& dev, size_t metadataSize)
: dev_(dev), metadataSize_{metadataSize} {}
~DeviceMetaDataWriter() override {
uint8_t* bufferData = buffer_.data();
// Write the last remaining bytes to the device
if (bufIndex_ > 0) {
if (offset_ + kBlockSize < metadataSize_) {
Buffer buffer = dev_.makeIOBuffer(kBlockSize);
memcpy(buffer.data(), bufferData, bufIndex_);
memset(buffer.data() + bufIndex_, 0, kBlockSize - bufIndex_);
dev_.write(offset_, std::move(buffer));
offset_ += kBlockSize;
}
}
if (offset_ + kBlockSize <= metadataSize_) {
// Write an additional block of zeroed out memory just to make the end
// of metadata clear
Buffer buffer = dev_.makeIOBuffer(kBlockSize);
memset(buffer.data(), 0, kBlockSize);
dev_.write(offset_, std::move(buffer));
}
}
void writeRecord(std::unique_ptr<folly::IOBuf> buf) override {
size_t totalLength = prependHeader(buf, kMetadataHeaderFileId);
if (totalLength == 0) {
return;
}
buf->unshare();
buf->coalesce();
auto size = buf->length();
auto data = buf->data();
auto dataOffset = 0;
uint8_t* bufferData = buffer_.data();
do {
if (bufIndex_ + headerSize() > kBlockSize) {
auto extraBytes = kBlockSize - bufIndex_;
// zero the unused bytes in the buffer
memset(&bufferData[bufIndex_], 0, extraBytes);
// Make sure we do not write beyond the maximum allocated for metadata
if (offset_ + kBlockSize > metadataSize_) {
throw std::logic_error("exceeding metadata limit");
}
Buffer buffer = dev_.makeIOBuffer(kBlockSize);
memcpy(buffer.data(), bufferData, kBlockSize);
if (!dev_.write(offset_, std::move(buffer))) {
throw std::invalid_argument(
folly::sformat("write failed: offset = {}", offset_));
}
offset_ += kBlockSize;
bufIndex_ = 0;
}
auto cpBytes = std::min(static_cast<uint64_t>(kBlockSize - bufIndex_),
static_cast<uint64_t>(size));
memcpy(&bufferData[bufIndex_], data + dataOffset, cpBytes);
dataOffset += cpBytes;
bufIndex_ += cpBytes;
size -= cpBytes;
} while (size > 0);
}
bool invalidate() override {
Buffer invalidateBuffer{kBlockSize, kBlockSize};
memset(invalidateBuffer.data(), 0, kBlockSize);
return dev_.write(0, std::move(invalidateBuffer));
}
private:
static constexpr uint32_t kBlockSize{4096};
Device& dev_;
uint64_t offset_{0};
size_t metadataSize_{0};
Buffer buffer_{kBlockSize, kBlockSize};
uint32_t bufIndex_{0};
};
class DeviceMetaDataReader final : public RecordReader {
public:
explicit DeviceMetaDataReader(Device& dev, size_t metadataSize)
: dev_{dev}, metadataSize_{metadataSize} {}
~DeviceMetaDataReader() override = default;
std::unique_ptr<folly::IOBuf> readRecord() override {
bool readHeader = true;
std::unique_ptr<folly::IOBuf> buf = nullptr;
uint8_t* bufferData = buffer_.data();
uint64_t size = 0;
uint8_t* data = nullptr;
auto dataOffset = 0;
do {
// This is true when we have to read a header and there are not
// enough bytes in the buffer OR we have to read the next block
// in the multi-block read
if (bufIndex_ + headerSize() > kBlockSize) {
// read new block from the device if the number of bytes left from
// previous read are less than header size.
if (offset_ + kBlockSize > metadataSize_) {
throw std::logic_error("exceeding metadata limit");
}
// read from device to the middle of the buffer 'kReadOffset'
if (!dev_.read(offset_, kBlockSize, bufferData)) {
throw std::invalid_argument(
folly::sformat("read failed: offset = {}", offset_));
}
offset_ += kBlockSize;
bufIndex_ = 0;
}
// Parse the header if we are expecting header
if (readHeader) {
readHeader = false;
auto valid = validateRecordHeader(
folly::Range<unsigned char*>(&bufferData[bufIndex_],
kBlockSize - bufIndex_),
kMetadataHeaderFileId);
if (!valid) {
throw std::logic_error("Invalid record header");
}
recordio_detail::Header* h =
reinterpret_cast<recordio_detail::Header*>(&bufferData[bufIndex_]);
size = headerSize() + h->dataLength;
// copy the header also to IOBuf so that we can do validation
buf = folly::IOBuf::create(size);
if (buf == nullptr) {
return nullptr;
}
buf->append(size);
data = buf->writableData();
dataOffset = 0;
}
auto cpSize =
std::min(static_cast<uint64_t>(kBlockSize - bufIndex_), size);
memcpy(data + dataOffset, &bufferData[bufIndex_], cpSize);
bufIndex_ += cpSize;
dataOffset += cpSize;
size -= cpSize;
} while (size > 0);
// Validate the what we just read from the device
auto record =
validateRecordData(folly::Range<unsigned char*>(data, buf->length()));
if (record.fileId == 0) {
throw std::invalid_argument(folly::sformat(
"Invalid record : offset = {}, length = {}", offset_, buf->length()));
}
// skip the header part and return
buf->trimStart(headerSize());
return buf;
}
bool isEnd() const override {
Buffer headerBuf{kBlockSize, kBlockSize};
if (offset_ + kBlockSize > metadataSize_) {
return true;
}
auto res = dev_.read(offset_, kBlockSize, headerBuf.data());
if (!res) {
return true;
}
auto valid = validateRecordHeader(
folly::Range<unsigned char*>(headerBuf.data(), kBlockSize),
kMetadataHeaderFileId);
return !valid;
}
private:
// TODO: T95780004 get block size from device or through constructor
static constexpr size_t kBlockSize = 4096;
Device& dev_;
uint64_t offset_{0};
size_t metadataSize_{0};
uint64_t bufIndex_{kBlockSize};
Buffer buffer_{kBlockSize, kBlockSize};
};
} // namespace
std::unique_ptr<RecordWriter> createMetadataRecordWriter(Device& dev,
size_t metadataSize) {
return std::make_unique<DeviceMetaDataWriter>(dev, metadataSize);
}
std::unique_ptr<RecordReader> createMetadataRecordReader(Device& dev,
size_t metadataSize) {
return std::make_unique<DeviceMetaDataReader>(dev, metadataSize);
}
std::unique_ptr<RecordWriter> createFileRecordWriter(int fd) {
return std::make_unique<FileRecordWriter>(fd);
}
std::unique_ptr<RecordReader> createFileRecordReader(int fd) {
return std::make_unique<FileRecordReader>(fd);
}
} // namespace navy
} // namespace cachelib
} // namespace facebook