cachelib/persistence/PersistenceManager.cpp (291 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Copyright 2004-present Facebook. All Rights Reserved. #include "cachelib/persistence/PersistenceManager.h" #include <folly/File.h> #include <folly/FileUtil.h> #include <folly/hash/Checksum.h> #include <fstream> #include "cachelib/allocator/CacheAllocatorConfig.h" #include "cachelib/common/Utils.h" namespace facebook::cachelib::persistence { using CopyBufferOp = folly::IOBuf::CopyBufferOp; const char PersistenceManager::DATA_BEGIN_CHAR = static_cast<char>(28); const char PersistenceManager::DATA_MARK_CHAR = static_cast<char>(30); const char PersistenceManager::DATA_END_CHAR = static_cast<char>(31); /** * Cache data (shm and navy) will be split into multiple data blocks to limit * the memory usage while restore, especially navy data might be much larger * than free memory. Read/write buffering is another benefit, it depends on the * stream reader/writer implementation, users can do the async buffering. * * Each block contains data length, checksum, and data up to kDataBlockSize * length. When we persist shm data with exact kDataBlockSize length, we want to * avoid the copy through DataBlock.data, and pass it directly to the writer, * we define a DataBlockHeader that only container checksum and data length. * * All block length except the last one is kDataBlockSize. * The last block might have a smaller length, if the total data size * (cache_size/navy_file_size) can't be divided by kDataBlockSize evenly. */ struct FOLLY_PACK_ATTR DataBlockHeader { uint32_t checksum; uint32_t length; // less or equal than kDataBlockSize void setLengthAndComputeChecksum(size_t len, const uint8_t* data) { XDCHECK_LE(len, static_cast<size_t>(kDataBlockSize)); length = len; checksum = folly::crc32(data, length); } }; struct FOLLY_PACK_ATTR DataBlock { DataBlockHeader header; uint8_t data[kDataBlockSize]; void setData(size_t len, const uint8_t* ptr) { XDCHECK_LE(len, static_cast<size_t>(kDataBlockSize)); ::memcpy(data, ptr, len); header.setLengthAndComputeChecksum(len, data); } bool validate() const { return header.checksum == folly::crc32(data, header.length); } }; void PersistenceManager::saveCache(PersistenceStreamWriter& writer) { util::Timer timer; timer.startOrResume(); XLOGF(INFO, "Start saving cache: cacheName {}, cacheDir {}", *config_.cacheName_ref(), cacheDir_); writer.write(DATA_BEGIN_CHAR); // save versions { auto buf = Serializer::serializeToIOBuf(versions_); auto header = makeHeader(PersistenceType::Versions, buf->length()); // The persisted stream consists of headers that are thrift serialized and // data blocks that are custom serialized in binary format to stream in // chunks. While restoring/deserializing from the stream, we want to read // from the stream only the bytes around the serialization boundaries to // simplify implementation. Hence, we persist the size of thrift header in // binary format first so that we can read only that much from the stream // before proceeding to read/copy custom serialized blobs. Only one length // is persisted if we use BinarySerializer (fix encoding) not // CompactSerializer(variant encoding), this will be used to deserialize all // headers in restoreCache(). size_t headerLength = header.length(); writer.write( folly::IOBuf(CopyBufferOp::COPY_BUFFER, &headerLength, sizeof(size_t))); writer.write(header); writer.write(*buf); } // save configs { writer.write(DATA_MARK_CHAR); auto buf = Serializer::serializeToIOBuf(config_); writer.write(makeHeader(PersistenceType::Configs, buf->length())); writer.write(*buf); } // save meta data file (cache_dir/NvmCacheState) saveFile(writer, PersistenceType::NvmCacheState, NvmCacheState::getNvmCacheStateFilePath(cacheDir_)); // save shm_info auto shmInfo = saveShm(writer, PersistenceType::ShmInfo, detail::kShmInfoName); // save shm_hash_table auto shmHT = saveShm(writer, PersistenceType::ShmHT, detail::kShmHashTableName); // save shm_chained_alloc_hash_table auto shmChainedHT = saveShm(writer, PersistenceType::ShmChainedItemHT, detail::kShmChainedItemHashTableName); // save /dev/shm/shm_cache to multiple data blocks auto shmCache = saveShm(writer, PersistenceType::ShmData, detail::kShmCacheName); // save navy data { writer.write(DATA_MARK_CHAR); int32_t numBlocks = util::getAlignedSize(navyFileSize_, kDataBlockSize) / kDataBlockSize; writer.write(makeHeader(PersistenceType::NavyPartition, navyFileSize_)); for (const std::string& file : navyFiles_) { folly::File f(file); for (int32_t i = 0; i < numBlocks; ++i) { DataBlock db; // readFull function read up to kDataBlockSize bytes from file // and return the num of bytes read. auto res = folly::readFull(f.fd(), db.data, kDataBlockSize); CACHELIB_CHECK_THROWF(res != -1, "fail to write file {}, errno: {}", file, errno); db.header.setLengthAndComputeChecksum(res, db.data); writer.write( folly::IOBuf(CopyBufferOp::COPY_BUFFER, &db, sizeof(DataBlock))); } } } writer.write(DATA_END_CHAR); writer.flush(); timer.pause(); XLOGF(INFO, "saveCache finish, spent {} seconds", timer.getDurationSec()); } void PersistenceManager::restoreCache(PersistenceStreamReader& reader) { util::Timer timer; timer.startOrResume(); XLOGF(INFO, "Start restoring cache: cacheName {}, cacheDir {}", *config_.cacheName_ref(), cacheDir_); CACHELIB_CHECK_THROW(reader.read() == DATA_BEGIN_CHAR, "invalid beginning character"); auto headerLengthBuf = reader.read(sizeof(size_t)); size_t headerLength = cast<size_t>(headerLengthBuf.data()); ShmManager::cleanup(cacheDir_, true); ShmManager shmManager(cacheDir_, true); SCOPE_SUCCESS { shmManager.shutDown(); }; while (true) { auto headerBuf = reader.read(headerLength); CACHELIB_CHECK_THROW(headerBuf.length() == headerLength, "invalid data"); auto header = deserialize<PersistenceHeader>(headerBuf); size_t dataLen = static_cast<size_t>(*header.length_ref()); XLOGF(INFO, "restoreCache: type {}, len {}, header_len {}", static_cast<int>(*header.type_ref()), dataLen, headerLength); switch (*header.type_ref()) { case PersistenceType::Versions: { auto buf = reader.read(dataLen); CACHELIB_CHECK_THROW(buf.length() == dataLen, "invalid data"); deserializeAndValidateVersions(buf); break; } case PersistenceType::Configs: { auto buf = reader.read(dataLen); CACHELIB_CHECK_THROW(buf.length() == dataLen, "invalid data"); auto config = deserialize<PersistCacheLibConfig>(buf); CACHELIB_CHECK_THROWF(config == config_, "Config doesn't match: {}|{}", *config.cacheName_ref(), *config_.cacheName_ref()); break; } case PersistenceType::NvmCacheState: { auto buf = reader.read(dataLen); CACHELIB_CHECK_THROW(buf.length() == dataLen, "invalid data"); restoreFile(buf, NvmCacheState::getNvmCacheStateFilePath(cacheDir_)); break; } case PersistenceType::ShmInfo: { auto shm = shmManager.createShm(detail::kShmInfoName, dataLen); restoreDataFromBlocks(reader, static_cast<uint8_t*>(shm.addr), dataLen); break; } case PersistenceType::ShmHT: { auto shm = shmManager.createShm(detail::kShmHashTableName, dataLen); restoreDataFromBlocks(reader, static_cast<uint8_t*>(shm.addr), dataLen); break; } case PersistenceType::ShmChainedItemHT: { auto shm = shmManager.createShm(detail::kShmChainedItemHashTableName, dataLen); restoreDataFromBlocks(reader, static_cast<uint8_t*>(shm.addr), dataLen); break; } case PersistenceType::ShmData: { ShmSegmentOpts opts; opts.alignment = sizeof(Slab); // 4MB auto shm = shmManager.createShm(detail::kShmCacheName, *header.length_ref(), nullptr, opts); restoreDataFromBlocks(reader, static_cast<uint8_t*>(shm.addr), *header.length_ref()); break; } case PersistenceType::NavyPartition: { int32_t navyFileSize = *header.length_ref(); int32_t numBlock = util::getAlignedSize(navyFileSize, kDataBlockSize) / kDataBlockSize; for (size_t i = 0; i < navyFiles_.size(); ++i) { folly::File f(navyFiles_[i], O_CREAT | O_WRONLY | O_TRUNC); for (int32_t j = 0; j < numBlock; ++j) { auto buf = reader.read(sizeof(DataBlock)); CACHELIB_CHECK_THROW(buf.length() == sizeof(DataBlock), "invalid data"); const DataBlock& db = cast<DataBlock>(buf.data()); CACHELIB_CHECK_THROW(db.validate(), "invalid checksum"); auto res = folly::writeFull(f.fd(), db.data, db.header.length); CACHELIB_CHECK_THROWF(res != -1, "fail to write file {}, errno: {}", navyFiles_[i], errno); } } break; } default: CACHELIB_CHECK_THROWF(false, "Unknow header type: {}", static_cast<int>(*header.type_ref())); } char mark = reader.read(); switch (mark) { case DATA_MARK_CHAR: continue; case DATA_END_CHAR: timer.pause(); XLOGF(INFO, "restoreCache finish, spent {} seconds", timer.getDurationSec()); return; default: CACHELIB_CHECK_THROWF(false, "Unknown character: {}", mark); } } } folly::IOBuf PersistenceManager::makeHeader(PersistenceType type, size_t length) { PersistenceHeader header; header.type_ref() = type; header.length_ref() = length; // we must use apache::thrift::BinarySerializer not compact serializer, // so the integer is not compress (variant encoding) auto buf = Serializer::serializeToIOBuf<PersistenceHeader, apache::thrift::BinarySerializer>(header); return std::move(*buf); } void PersistenceManager::saveFile(PersistenceStreamWriter& writer, PersistenceType type, const folly::StringPiece file) { std::ifstream f(file.data()); CACHELIB_CHECK_THROWF(f, "fail to open file to read {}", file.data()); std::string buf{std::istreambuf_iterator<char>(f), std::istreambuf_iterator<char>()}; writer.write(DATA_MARK_CHAR); writer.write(makeHeader(type, buf.length())); writer.write( folly::IOBuf(CopyBufferOp::COPY_BUFFER, buf.data(), buf.length())); } void PersistenceManager::restoreFile(const folly::IOBuf& buf, const folly::StringPiece file) { if (buf.empty()) { return; } std::ofstream f(file.data(), std::ios::trunc); CACHELIB_CHECK_THROWF(f, "fail to open file to write {}", file.data()); f.write(reinterpret_cast<const char*>(buf.data()), buf.length()); if (f.good() && f.flush() && f.good()) { return; } // write failed CACHELIB_CHECK_THROWF(f, "fail to write to file {}, iostate: {}", file.data(), static_cast<int>(f.rdstate())); } std::unique_ptr<ShmSegment> PersistenceManager::saveShm( PersistenceStreamWriter& writer, PersistenceType type, const std::string& name) { auto segment = ShmManager::attachShmReadOnly(cacheDir_, name, true); auto shm = segment->getCurrentMapping(); CACHELIB_CHECK_THROWF(shm.size > 0, "shm {} is empty.", name); writer.write(DATA_MARK_CHAR); writer.write(makeHeader(type, shm.size)); saveDataInBlocks(writer, shm); return segment; } void PersistenceManager::saveDataInBlocks(PersistenceStreamWriter& writer, const ShmAddr& shm) { const uint8_t* ptr = static_cast<uint8_t*>(shm.addr); uint32_t numBlock = util::getAlignedSize(shm.size, kDataBlockSize) / kDataBlockSize; const uint8_t* endPtr = ptr + shm.size; for (uint32_t i = 0; i < numBlock; ++i) { if (ptr + kDataBlockSize > endPtr) { // last data block might have less data than kDataBlockSize DataBlock db; db.setData(endPtr - ptr, ptr); writer.write( folly::IOBuf(CopyBufferOp::COPY_BUFFER, &db, sizeof(DataBlock))); ptr += db.header.length; } else { // data blocks other than the last one have exactly kDataBlockSize // length of data, we can avoid the extra data copy. DataBlockHeader dbh; dbh.setLengthAndComputeChecksum(kDataBlockSize, ptr); auto buf = folly::IOBuf(CopyBufferOp::COPY_BUFFER, &dbh, sizeof(DataBlockHeader)); // chained header and data to make a single write and be consistent with // restore buf.appendToChain(folly::IOBuf::wrapBuffer(ptr, kDataBlockSize)); // we will trigger flush before shm dropped, so wrapBuffer is safe writer.write(std::move(buf)); ptr += kDataBlockSize; } } } void PersistenceManager::restoreDataFromBlocks(PersistenceStreamReader& reader, uint8_t* ptr, size_t size) { uint32_t numBlock = util::getAlignedSize(size, kDataBlockSize) / kDataBlockSize; for (uint32_t i = 0; i < numBlock; ++i) { auto buf = reader.read(sizeof(DataBlock)); CACHELIB_CHECK_THROW(buf.length() == sizeof(DataBlock), "invalid data"); const DataBlock& db = cast<DataBlock>(buf.data()); CACHELIB_CHECK_THROW(db.validate(), "invalid checksum"); ::memcpy(ptr, db.data, db.header.length); ptr += db.header.length; } } void PersistenceManager::deserializeAndValidateVersions( const folly::IOBuf& buf) { auto versions = deserialize<CacheLibVersions>(buf); CACHELIB_CHECK_THROWF(*versions.persistenceVersion_ref() == *versions_.persistenceVersion_ref(), "Persistence Version doesn't match: {}|{}", *versions.persistenceVersion_ref(), *versions_.persistenceVersion_ref()); if (versions != versions_) { // print a warning log for cache version mismatch, // attaching the cache will might be anble to do upgrade // even cache format changed. XLOGF(WARN, "Cache Version doesn't match: {}|{} {}|{}, {}|{}", *versions.allocatorVersion_ref(), *versions_.allocatorVersion_ref(), *versions.ramFormatVerson_ref(), *versions_.ramFormatVerson_ref(), *versions.nvmFormatVersion_ref(), *versions_.nvmFormatVersion_ref()); } } } // namespace facebook::cachelib::persistence