cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp (263 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "MetadataStorageFromRocksDB.h" #if USE_ROCKSDB #include <Disks/ObjectStorages/DiskObjectStorageMetadata.h> #include <Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h> #include <Disks/ObjectStorages/StaticDirectoryIterator.h> #include <Interpreters/Context.h> #include <Storages/MergeTree/MetaDataHelper.h> #include <rocksdb/db.h> #include <Common/QueryContext.h> namespace local_engine { static std::string getObjectKeyCompatiblePrefix( const DB::IObjectStorage & object_storage, const Poco::Util::AbstractConfiguration & config, const String & config_prefix) { return config.getString(config_prefix + ".key_compatibility_prefix", object_storage.getCommonKeyPrefix()); } DB::MetadataStoragePtr MetadataStorageFromRocksDB::create( const std::string &, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, DB::ObjectStoragePtr object_storage) { auto metadata_path = config.getString(config_prefix + ".metadata_path"); size_t clean_meta_task_interval_seconds = config.getUInt(config_prefix + ".clean_meta_task_interval_seconds", 60 * 60 * 12); fs::create_directories(metadata_path); auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix); return std::make_shared<MetadataStorageFromRocksDB>( key_compatibility_prefix, metadata_path, object_storage, clean_meta_task_interval_seconds); } MetadataStorageFromRocksDB::MetadataStorageFromRocksDB( const String & compatible_key_prefix, const String & rocksdb_dir, DB::ObjectStoragePtr & object_storage_, size_t metadata_clean_task_interval_seconds_) : compatible_key_prefix(compatible_key_prefix) , rocksdb_dir(rocksdb_dir) , object_storage(object_storage_) , metadata_clean_task_interval_seconds(metadata_clean_task_interval_seconds_) { rocksdb::Options options; options.create_if_missing = true; throwRockDBErrorNotOk(rocksdb::DB::Open(options, rocksdb_dir, &rocksdb)); metadata_clean_task = QueryContext::globalContext()->getSchedulePool().createTask( "MetadataStorageFromRocksDB", [this] { cleanOutdatedMetadataThreadFunc(); }); metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds * 1000); logger = getLogger("MetadataStorageFromRocksDB"); } DB::MetadataTransactionPtr MetadataStorageFromRocksDB::createTransaction() { return std::make_shared<MetadataStorageFromRocksDBTransaction>(*this); } const std::string & MetadataStorageFromRocksDB::getPath() const { return rocksdb_dir; } DB::MetadataStorageType MetadataStorageFromRocksDB::getType() const { return DB::MetadataStorageType::None; } bool MetadataStorageFromRocksDB::existsFileOrDirectory(const std::string & path) const { return exist(getRocksDB(), path); } bool MetadataStorageFromRocksDB::existsFile(const std::string & path) const { std::string data; return tryGetData(getRocksDB(), path, &data) && data != RocksDBCreateDirectoryOperation::DIR_DATA; } bool MetadataStorageFromRocksDB::existsDirectory(const std::string & path) const { std::string data; return tryGetData(getRocksDB(), path, &data) && data == RocksDBCreateDirectoryOperation::DIR_DATA; } uint64_t MetadataStorageFromRocksDB::getFileSize(const std::string & path) const { return readMetadata(path)->getTotalSizeBytes(); } Poco::Timestamp MetadataStorageFromRocksDB::getLastModified(const std::string & /*path*/) const { return {}; } bool MetadataStorageFromRocksDB::supportsChmod() const { return false; } bool MetadataStorageFromRocksDB::supportsStat() const { return false; } bool MetadataStorageFromRocksDB::supportsPartitionCommand(const DB::PartitionCommand & command) const { return false; } std::vector<std::string> MetadataStorageFromRocksDB::listDirectory(const std::string & path) const { return listKeys(getRocksDB(), path); } DB::DirectoryIteratorPtr MetadataStorageFromRocksDB::iterateDirectory(const std::string & path) const { auto files = listKeys(getRocksDB(), path); std::vector<std::filesystem::path> paths; paths.reserve(files.size()); for (const auto & file : files) paths.emplace_back(file); return std::make_unique<DB::StaticDirectoryIterator>(std::move(paths)); } uint32_t MetadataStorageFromRocksDB::getHardlinkCount(const std::string & /*path*/) const { return 0; } DB::StoredObjects MetadataStorageFromRocksDB::getStorageObjects(const std::string & path) const { auto metadata = readMetadata(path); const auto & keys_with_meta = metadata->getKeysWithMeta(); DB::StoredObjects objects; objects.reserve(keys_with_meta.size()); for (const auto & [object_key, object_meta] : keys_with_meta) objects.emplace_back(object_key.serialize(), path, object_meta.size_bytes, object_meta.offset); return objects; } DB::DiskObjectStorageMetadataPtr MetadataStorageFromRocksDB::readMetadata(const std::string & path) const { std::shared_lock lock(metadata_mutex); return readMetadataUnlocked(path, lock); } DB::DiskObjectStorageMetadataPtr MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path, std::unique_lock<DB::SharedMutex> &) const { auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path); auto str = getData(getRocksDB(), path); metadata->deserializeFromString(str); return metadata; } DB::DiskObjectStorageMetadataPtr MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path, std::shared_lock<DB::SharedMutex> &) const { auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path); auto str = getData(getRocksDB(), path); metadata->deserializeFromString(str); return metadata; } std::string MetadataStorageFromRocksDB::readFileToString(const std::string & path) const { return getData(getRocksDB(), path); } void MetadataStorageFromRocksDB::shutdown() { metadata_clean_task->deactivate(); if (rocksdb) { rocksdb->Close(); rocksdb = nullptr; } } void MetadataStorageFromRocksDB::cleanOutdatedMetadataThreadFunc() { LOG_INFO(logger, "start to clean disk metadata in rocksdb."); std::queue<String> part_queue; size_t total_count_remove = 0; auto removeParts = [&] { while (!part_queue.empty()) { auto meta_name = part_queue.front(); part_queue.pop(); std::filesystem::path meta_path(meta_name); auto part_path = meta_path.parent_path(); auto files = listDirectory(part_path); total_count_remove += (files.size() + 1); getRocksDB().DeleteRange({}, part_path.generic_string(), files.back()); getRocksDB().Delete({}, files.back()); } }; auto * it = getRocksDB().NewIterator({}); String prev_key; String prev_data; for (it->SeekToFirst(); it->Valid(); it->Next()) { auto file_name = it->key().ToString(); // mark outdated part if (isMergeTreePartMetaDataFile(file_name)) { auto objects = getStorageObjects(it->key().ToString()); if (!object_storage->exists(objects.front())) part_queue.push(file_name); } // clean empty directory if (!prev_key.empty() && !file_name.starts_with(prev_key) && prev_data == RocksDBCreateDirectoryOperation::DIR_DATA) { getRocksDB().Delete({}, prev_key); total_count_remove ++; } if (part_queue.size() > 10000) { removeParts(); } } removeParts(); rocksdb::Slice begin(nullptr, 0); rocksdb::Slice end(nullptr, 0); rocksdb::Status s = getRocksDB().CompactRange({}, &begin, &end); LOG_INFO(logger, "Clean meta finish, totally clean {} meta", total_count_remove); metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds * 1000); } DB::SharedMutex & MetadataStorageFromRocksDB::getMetadataMutex() const { return metadata_mutex; } rocksdb::DB & MetadataStorageFromRocksDB::getRocksDB() const { if (!rocksdb) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "RocksDB is not initialized"); return *rocksdb; } void MetadataStorageFromRocksDBTransaction::commit() { commitImpl(metadata_storage.getMetadataMutex()); } const DB::IMetadataStorage & MetadataStorageFromRocksDBTransaction::getStorageForNonTransactionalReads() const { return metadata_storage; } bool MetadataStorageFromRocksDBTransaction::supportsChmod() const { return false; } void MetadataStorageFromRocksDBTransaction::createEmptyMetadataFile(const std::string & path) { auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path); writeStringToFile(path, metadata->serializeToString()); } void MetadataStorageFromRocksDBTransaction::createMetadataFile(const std::string & path, DB::ObjectStorageKey key, uint64_t size_in_bytes) { auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path); metadata->addObject(std::move(key), size_in_bytes); auto data = metadata->serializeToString(); if (!data.empty()) addOperation(std::make_unique<RocksDBWriteFileOperation>(path, metadata_storage.getRocksDB(), data)); } void MetadataStorageFromRocksDBTransaction::writeStringToFile(const std::string & path, const std::string & data) { addOperation(std::make_unique<RocksDBWriteFileOperation>(path, metadata_storage.getRocksDB(), data)); } void MetadataStorageFromRocksDBTransaction::createDirectory(const std::string & path) { addOperation(std::make_unique<RocksDBCreateDirectoryOperation>(path, metadata_storage.getRocksDB())); } void MetadataStorageFromRocksDBTransaction::createDirectoryRecursive(const std::string & path) { addOperation(std::make_unique<RocksDBCreateDirectoryRecursiveOperation>(path, metadata_storage.getRocksDB())); } void MetadataStorageFromRocksDBTransaction::removeDirectory(const std::string & path) { addOperation(std::make_unique<RocksDBRemoveDirectoryOperation>(path, metadata_storage.getRocksDB())); } void MetadataStorageFromRocksDBTransaction::removeRecursive(const std::string & path) { addOperation(std::make_unique<RocksDBRemoveRecursiveOperation>(path, metadata_storage.getRocksDB())); } void MetadataStorageFromRocksDBTransaction::unlinkFile(const std::string & path) { addOperation(std::make_unique<RocksDBUnlinkFileOperation>(path, metadata_storage.getRocksDB())); } } #endif