cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp (263 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MetadataStorageFromRocksDB.h"
#if USE_ROCKSDB
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h>
#include <Disks/ObjectStorages/StaticDirectoryIterator.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <rocksdb/db.h>
#include <Common/QueryContext.h>
namespace local_engine
{
static std::string getObjectKeyCompatiblePrefix(
const DB::IObjectStorage & object_storage, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
return config.getString(config_prefix + ".key_compatibility_prefix", object_storage.getCommonKeyPrefix());
}
DB::MetadataStoragePtr MetadataStorageFromRocksDB::create(
const std::string &,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DB::ObjectStoragePtr object_storage)
{
auto metadata_path = config.getString(config_prefix + ".metadata_path");
size_t clean_meta_task_interval_seconds = config.getUInt(config_prefix + ".clean_meta_task_interval_seconds", 60 * 60 * 12);
fs::create_directories(metadata_path);
auto key_compatibility_prefix = getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
return std::make_shared<MetadataStorageFromRocksDB>(
key_compatibility_prefix, metadata_path, object_storage, clean_meta_task_interval_seconds);
}
MetadataStorageFromRocksDB::MetadataStorageFromRocksDB(
const String & compatible_key_prefix,
const String & rocksdb_dir,
DB::ObjectStoragePtr & object_storage_,
size_t metadata_clean_task_interval_seconds_)
: compatible_key_prefix(compatible_key_prefix)
, rocksdb_dir(rocksdb_dir)
, object_storage(object_storage_)
, metadata_clean_task_interval_seconds(metadata_clean_task_interval_seconds_)
{
rocksdb::Options options;
options.create_if_missing = true;
throwRockDBErrorNotOk(rocksdb::DB::Open(options, rocksdb_dir, &rocksdb));
metadata_clean_task = QueryContext::globalContext()->getSchedulePool().createTask(
"MetadataStorageFromRocksDB", [this] { cleanOutdatedMetadataThreadFunc(); });
metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds * 1000);
logger = getLogger("MetadataStorageFromRocksDB");
}
DB::MetadataTransactionPtr MetadataStorageFromRocksDB::createTransaction()
{
return std::make_shared<MetadataStorageFromRocksDBTransaction>(*this);
}
const std::string & MetadataStorageFromRocksDB::getPath() const
{
return rocksdb_dir;
}
DB::MetadataStorageType MetadataStorageFromRocksDB::getType() const
{
return DB::MetadataStorageType::None;
}
bool MetadataStorageFromRocksDB::existsFileOrDirectory(const std::string & path) const
{
return exist(getRocksDB(), path);
}
bool MetadataStorageFromRocksDB::existsFile(const std::string & path) const
{
std::string data;
return tryGetData(getRocksDB(), path, &data) && data != RocksDBCreateDirectoryOperation::DIR_DATA;
}
bool MetadataStorageFromRocksDB::existsDirectory(const std::string & path) const
{
std::string data;
return tryGetData(getRocksDB(), path, &data) && data == RocksDBCreateDirectoryOperation::DIR_DATA;
}
uint64_t MetadataStorageFromRocksDB::getFileSize(const std::string & path) const
{
return readMetadata(path)->getTotalSizeBytes();
}
Poco::Timestamp MetadataStorageFromRocksDB::getLastModified(const std::string & /*path*/) const
{
return {};
}
bool MetadataStorageFromRocksDB::supportsChmod() const
{
return false;
}
bool MetadataStorageFromRocksDB::supportsStat() const
{
return false;
}
bool MetadataStorageFromRocksDB::supportsPartitionCommand(const DB::PartitionCommand & command) const
{
return false;
}
std::vector<std::string> MetadataStorageFromRocksDB::listDirectory(const std::string & path) const
{
return listKeys(getRocksDB(), path);
}
DB::DirectoryIteratorPtr MetadataStorageFromRocksDB::iterateDirectory(const std::string & path) const
{
auto files = listKeys(getRocksDB(), path);
std::vector<std::filesystem::path> paths;
paths.reserve(files.size());
for (const auto & file : files)
paths.emplace_back(file);
return std::make_unique<DB::StaticDirectoryIterator>(std::move(paths));
}
uint32_t MetadataStorageFromRocksDB::getHardlinkCount(const std::string & /*path*/) const
{
return 0;
}
DB::StoredObjects MetadataStorageFromRocksDB::getStorageObjects(const std::string & path) const
{
auto metadata = readMetadata(path);
const auto & keys_with_meta = metadata->getKeysWithMeta();
DB::StoredObjects objects;
objects.reserve(keys_with_meta.size());
for (const auto & [object_key, object_meta] : keys_with_meta)
objects.emplace_back(object_key.serialize(), path, object_meta.size_bytes, object_meta.offset);
return objects;
}
DB::DiskObjectStorageMetadataPtr MetadataStorageFromRocksDB::readMetadata(const std::string & path) const
{
std::shared_lock lock(metadata_mutex);
return readMetadataUnlocked(path, lock);
}
DB::DiskObjectStorageMetadataPtr
MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path, std::unique_lock<DB::SharedMutex> &) const
{
auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path);
auto str = getData(getRocksDB(), path);
metadata->deserializeFromString(str);
return metadata;
}
DB::DiskObjectStorageMetadataPtr
MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path, std::shared_lock<DB::SharedMutex> &) const
{
auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path);
auto str = getData(getRocksDB(), path);
metadata->deserializeFromString(str);
return metadata;
}
std::string MetadataStorageFromRocksDB::readFileToString(const std::string & path) const
{
return getData(getRocksDB(), path);
}
void MetadataStorageFromRocksDB::shutdown()
{
metadata_clean_task->deactivate();
if (rocksdb)
{
rocksdb->Close();
rocksdb = nullptr;
}
}
void MetadataStorageFromRocksDB::cleanOutdatedMetadataThreadFunc()
{
LOG_INFO(logger, "start to clean disk metadata in rocksdb.");
std::queue<String> part_queue;
size_t total_count_remove = 0;
auto removeParts = [&]
{
while (!part_queue.empty())
{
auto meta_name = part_queue.front();
part_queue.pop();
std::filesystem::path meta_path(meta_name);
auto part_path = meta_path.parent_path();
auto files = listDirectory(part_path);
total_count_remove += (files.size() + 1);
getRocksDB().DeleteRange({}, part_path.generic_string(), files.back());
getRocksDB().Delete({}, files.back());
}
};
auto * it = getRocksDB().NewIterator({});
String prev_key;
String prev_data;
for (it->SeekToFirst(); it->Valid(); it->Next())
{
auto file_name = it->key().ToString();
// mark outdated part
if (isMergeTreePartMetaDataFile(file_name))
{
auto objects = getStorageObjects(it->key().ToString());
if (!object_storage->exists(objects.front()))
part_queue.push(file_name);
}
// clean empty directory
if (!prev_key.empty() && !file_name.starts_with(prev_key) && prev_data == RocksDBCreateDirectoryOperation::DIR_DATA)
{
getRocksDB().Delete({}, prev_key);
total_count_remove ++;
}
if (part_queue.size() > 10000)
{
removeParts();
}
}
removeParts();
rocksdb::Slice begin(nullptr, 0);
rocksdb::Slice end(nullptr, 0);
rocksdb::Status s = getRocksDB().CompactRange({}, &begin, &end);
LOG_INFO(logger, "Clean meta finish, totally clean {} meta", total_count_remove);
metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds * 1000);
}
DB::SharedMutex & MetadataStorageFromRocksDB::getMetadataMutex() const
{
return metadata_mutex;
}
rocksdb::DB & MetadataStorageFromRocksDB::getRocksDB() const
{
if (!rocksdb)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "RocksDB is not initialized");
return *rocksdb;
}
void MetadataStorageFromRocksDBTransaction::commit()
{
commitImpl(metadata_storage.getMetadataMutex());
}
const DB::IMetadataStorage & MetadataStorageFromRocksDBTransaction::getStorageForNonTransactionalReads() const
{
return metadata_storage;
}
bool MetadataStorageFromRocksDBTransaction::supportsChmod() const
{
return false;
}
void MetadataStorageFromRocksDBTransaction::createEmptyMetadataFile(const std::string & path)
{
auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path);
writeStringToFile(path, metadata->serializeToString());
}
void MetadataStorageFromRocksDBTransaction::createMetadataFile(const std::string & path, DB::ObjectStorageKey key, uint64_t size_in_bytes)
{
auto metadata = std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix, path);
metadata->addObject(std::move(key), size_in_bytes);
auto data = metadata->serializeToString();
if (!data.empty())
addOperation(std::make_unique<RocksDBWriteFileOperation>(path, metadata_storage.getRocksDB(), data));
}
void MetadataStorageFromRocksDBTransaction::writeStringToFile(const std::string & path, const std::string & data)
{
addOperation(std::make_unique<RocksDBWriteFileOperation>(path, metadata_storage.getRocksDB(), data));
}
void MetadataStorageFromRocksDBTransaction::createDirectory(const std::string & path)
{
addOperation(std::make_unique<RocksDBCreateDirectoryOperation>(path, metadata_storage.getRocksDB()));
}
void MetadataStorageFromRocksDBTransaction::createDirectoryRecursive(const std::string & path)
{
addOperation(std::make_unique<RocksDBCreateDirectoryRecursiveOperation>(path, metadata_storage.getRocksDB()));
}
void MetadataStorageFromRocksDBTransaction::removeDirectory(const std::string & path)
{
addOperation(std::make_unique<RocksDBRemoveDirectoryOperation>(path, metadata_storage.getRocksDB()));
}
void MetadataStorageFromRocksDBTransaction::removeRecursive(const std::string & path)
{
addOperation(std::make_unique<RocksDBRemoveRecursiveOperation>(path, metadata_storage.getRocksDB()));
}
void MetadataStorageFromRocksDBTransaction::unlinkFile(const std::string & path)
{
addOperation(std::make_unique<RocksDBUnlinkFileOperation>(path, metadata_storage.getRocksDB()));
}
}
#endif