src/config/config.cc (933 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "config.h" #include <fmt/format.h> #include <rocksdb/env.h> #include <spdlog/spdlog.h> #include <strings.h> #include <cstddef> #include <cstdint> #include <cstring> #include <fstream> #include <iostream> #include <iterator> #include <string> #include <utility> #include <vector> #include "common/string_util.h" #include "config_type.h" #include "config_util.h" #include "parse_util.h" #include "rocksdb/compression_type.h" #include "server/server.h" #include "status.h" #include "storage/redis_metadata.h" constexpr const char *kDefaultDir = "/tmp/kvrocks"; constexpr const char *kDefaultBackupDir = "/tmp/kvrocks/backup"; constexpr const char *kDefaultPidfile = "/tmp/kvrocks/kvrocks.pid"; constexpr const char *kDefaultBindAddress = "127.0.0.1"; constexpr const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files to yes first."; constexpr const char *errLevelCompactionDynamicLevelBytesNotSet = "Must set rocksdb.level_compaction_dynamic_level_bytes yes first."; const std::vector<ConfigEnum<SupervisedMode>> supervised_modes{ {"no", kSupervisedNone}, {"auto", kSupervisedAutoDetect}, {"upstart", kSupervisedUpStart}, {"systemd", kSupervisedSystemd}, }; const std::vector<ConfigEnum<JsonStorageFormat>> json_storage_formats{{"json", JsonStorageFormat::JSON}, {"cbor", JsonStorageFormat::CBOR}}; const std::vector<ConfigEnum<rocksdb::CompressionType>> compression_types{[] { std::vector<ConfigEnum<rocksdb::CompressionType>> res; res.reserve(engine::CompressionOptions.size()); for (const auto &e : engine::CompressionOptions) { res.push_back({e.name, e.type}); } return res; }()}; const std::vector<ConfigEnum<rocksdb::CompressionType>> wal_compression_types{[] { std::vector<ConfigEnum<rocksdb::CompressionType>> res; res.reserve(engine::WalCompressionOptions.size()); for (const auto &e : engine::WalCompressionOptions) { res.push_back({e.name, e.type}); } return res; }()}; const std::vector<ConfigEnum<BlockCacheType>> cache_types{[] { std::vector<ConfigEnum<BlockCacheType>> res; res.reserve(engine::CacheOptions.size()); for (const auto &e : engine::CacheOptions) { res.push_back({e.name, e.type}); } return res; }()}; const std::vector<ConfigEnum<MigrationType>> migration_types{{"redis-command", MigrationType::kRedisCommand}, {"raw-key-value", MigrationType::kRawKeyValue}}; std::string TrimRocksDbPrefix(std::string s) { constexpr std::string_view prefix = "rocksdb."; if (!util::StartsWithICase(s, prefix)) return s; return s.substr(prefix.size()); } Status SetRocksdbCompression(Server *srv, const rocksdb::CompressionType compression, const size_t compression_start_level) { if (!srv) return Status::OK(); std::string compression_option; for (auto &option : engine::CompressionOptions) { if (option.type == compression) { compression_option = option.val; break; } } if (compression_option.empty()) { return {Status::NotOK, "Invalid compression type"}; } if (compression_start_level >= KVROCKS_MAX_LSM_LEVEL) { return {Status::NotOK, "compression_start_level must be < " + std::to_string(KVROCKS_MAX_LSM_LEVEL)}; } std::vector<std::string> compression_per_level_builder; compression_per_level_builder.reserve(KVROCKS_MAX_LSM_LEVEL); for (size_t i = 0; i < compression_start_level; i++) { compression_per_level_builder.emplace_back("kNoCompression"); } for (size_t i = compression_start_level; i < KVROCKS_MAX_LSM_LEVEL; i++) { compression_per_level_builder.emplace_back(compression_option); } const std::string compression_per_level = util::StringJoin( compression_per_level_builder, [](const auto &s) -> decltype(auto) { return s; }, ":"); return srv->storage->SetOptionForAllColumnFamilies("compression_per_level", compression_per_level); }; Config::Config() { struct FieldWrapper { std::string name; bool readonly; std::unique_ptr<ConfigField> field; FieldWrapper(std::string name, bool readonly, ConfigField *field) : name(std::move(name)), readonly(readonly), field(field) {} }; FieldWrapper fields[] = { {"daemonize", true, new YesNoField(&daemonize, false)}, {"bind", true, new StringField(&binds_str_, "")}, {"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)}, {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)}, #ifdef ENABLE_OPENSSL {"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)}, {"tls-cert-file", false, new StringField(&tls_cert_file, "")}, {"tls-key-file", false, new StringField(&tls_key_file, "")}, {"tls-key-file-pass", false, new StringField(&tls_key_file_pass, "")}, {"tls-ca-cert-file", false, new StringField(&tls_ca_cert_file, "")}, {"tls-ca-cert-dir", false, new StringField(&tls_ca_cert_dir, "")}, {"tls-protocols", false, new StringField(&tls_protocols, "")}, {"tls-auth-clients", false, new StringField(&tls_auth_clients, "")}, {"tls-ciphers", false, new StringField(&tls_ciphers, "")}, {"tls-ciphersuites", false, new StringField(&tls_ciphersuites, "")}, {"tls-prefer-server-ciphers", false, new YesNoField(&tls_prefer_server_ciphers, false)}, {"tls-session-caching", false, new YesNoField(&tls_session_caching, true)}, {"tls-session-cache-size", false, new IntField(&tls_session_cache_size, 1024 * 20, 0, INT_MAX)}, {"tls-session-cache-timeout", false, new IntField(&tls_session_cache_timeout, 300, 0, INT_MAX)}, {"tls-replication", true, new YesNoField(&tls_replication, false)}, #endif {"workers", false, new IntField(&workers, 8, 1, 256)}, {"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)}, {"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)}, {"maxclients", false, new IntField(&maxclients, 10240, 0, INT_MAX)}, {"max-backup-to-keep", false, new IntField(&max_backup_to_keep, 1, 0, 1)}, {"max-backup-keep-hours", false, new IntField(&max_backup_keep_hours, 0, 0, INT_MAX)}, {"master-use-repl-port", false, new YesNoField(&master_use_repl_port, false)}, {"requirepass", false, new StringField(&requirepass, "")}, {"masterauth", false, new StringField(&masterauth, "")}, {"slaveof", true, new StringField(&slaveof_, "")}, {"replicaof", true, new StringField(&slaveof_, "")}, {"compact-cron", false, new StringField(&compact_cron_str_, "")}, {"bgsave-cron", false, new StringField(&bgsave_cron_str_, "")}, {"dbsize-scan-cron", false, new StringField(&dbsize_scan_cron_str_, "")}, {"replica-announce-ip", false, new StringField(&replica_announce_ip, "")}, {"replica-announce-port", false, new UInt32Field(&replica_announce_port, 0, 0, PORT_LIMIT)}, {"compaction-checker-range", false, new StringField(&compaction_checker_range_str_, "")}, {"compaction-checker-cron", false, new StringField(&compaction_checker_cron_str_, "")}, {"force-compact-file-age", false, new Int64Field(&force_compact_file_age, 2 * 24 * 3600, 60, INT64_MAX)}, {"force-compact-file-min-deleted-percentage", false, new IntField(&force_compact_file_min_deleted_percentage, 10, 1, 100)}, {"db-name", true, new StringField(&db_name, "change.me.db")}, {"dir", true, new StringField(&dir, kDefaultDir)}, {"backup-dir", false, new StringField(&backup_dir, kDefaultBackupDir)}, {"log-dir", true, new StringField(&log_dir, "")}, {"log-level", false, new EnumField<spdlog::level::level_enum>(&log_level, log_levels, spdlog::level::info)}, {"pidfile", true, new StringField(&pidfile, kDefaultPidfile)}, {"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)}, {"max-bitmap-to-string-mb", false, new IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)}, {"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)}, {"max-replication-mb", false, new IntField(&max_replication_mb, 0, 0, INT_MAX)}, {"supervised", true, new EnumField<SupervisedMode>(&supervised_mode, supervised_modes, kSupervisedNone)}, {"slave-serve-stale-data", false, new YesNoField(&slave_serve_stale_data, true)}, {"slave-empty-db-before-fullsync", false, new YesNoField(&slave_empty_db_before_fullsync, false)}, {"slave-priority", false, new IntField(&slave_priority, 100, 0, INT_MAX)}, {"slave-read-only", false, new YesNoField(&slave_readonly, true)}, {"replication-connect-timeout-ms", false, new IntField(&replication_connect_timeout_ms, 3100, 0, INT_MAX)}, {"replication-recv-timeout-ms", false, new IntField(&replication_recv_timeout_ms, 3200, 0, INT_MAX)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, {"profiling-sample-record-threshold-ms", false, new IntField(&profiling_sample_record_threshold_ms, 100, 0, INT_MAX)}, {"slowlog-log-slower-than", false, new IntField(&slowlog_log_slower_than, 200000, -1, INT_MAX)}, {"profiling-sample-commands", false, new StringField(&profiling_sample_commands_str_, "")}, {"slowlog-max-len", false, new IntField(&slowlog_max_len, 128, 0, INT_MAX)}, {"purge-backup-on-fullsync", false, new YesNoField(&purge_backup_on_fullsync, false)}, {"rename-command", true, new MultiStringField(&rename_command_, std::vector<std::string>{})}, {"auto-resize-block-and-sst", false, new YesNoField(&auto_resize_block_and_sst, true)}, {"fullsync-recv-file-delay", false, new IntField(&fullsync_recv_file_delay, 0, 0, INT_MAX)}, {"cluster-enabled", true, new YesNoField(&cluster_enabled, false)}, {"migrate-speed", false, new IntField(&migrate_speed, 4096, 0, INT_MAX)}, {"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, INT_MAX)}, {"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, INT_MAX)}, {"migrate-type", false, new EnumField<MigrationType>(&migrate_type, migration_types, MigrationType::kRedisCommand)}, {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)}, {"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)}, {"unixsocket", true, new StringField(&unixsocket, "")}, {"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)}, {"log-retention-days", true, new IntField(&log_retention_days, -1, -1, INT_MAX)}, {"persist-cluster-nodes-enabled", false, new YesNoField(&persist_cluster_nodes_enabled, true)}, {"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, true)}, {"resp3-enabled", false, new YesNoField(&resp3_enabled, true)}, {"repl-namespace-enabled", false, new YesNoField(&repl_namespace_enabled, false)}, {"proto-max-bulk-len", false, new IntWithUnitField<uint64_t>(&proto_max_bulk_len, std::to_string(512 * MiB), 1 * MiB, UINT64_MAX)}, {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)}, {"json-storage-format", false, new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, /* rocksdb options */ {"rocksdb.compression", false, new EnumField<rocksdb::CompressionType>(&rocks_db.compression, compression_types, rocksdb::CompressionType::kNoCompression)}, {"rocksdb.compression_level", true, new IntField(&rocks_db.compression_level, 32767, INT_MIN, INT_MAX)}, {"rocksdb.compression_start_level", false, new IntField(&rocks_db.compression_start_level, 2, 0, KVROCKS_MAX_LSM_LEVEL - 1)}, {"rocksdb.block_size", true, new IntField(&rocks_db.block_size, 16384, 0, INT_MAX)}, {"rocksdb.max_open_files", false, new IntField(&rocks_db.max_open_files, 8096, -1, INT_MAX)}, {"rocksdb.write_buffer_size", false, new IntField(&rocks_db.write_buffer_size, 64, 0, 4096)}, {"rocksdb.max_write_buffer_number", false, new IntField(&rocks_db.max_write_buffer_number, 4, 0, 256)}, {"rocksdb.target_file_size_base", false, new IntField(&rocks_db.target_file_size_base, 128, 1, 1024)}, {"rocksdb.max_background_compactions", false, new IntField(&rocks_db.max_background_compactions, 2, -1, 32)}, {"rocksdb.max_background_flushes", true, new IntField(&rocks_db.max_background_flushes, 2, -1, 32)}, {"rocksdb.max_subcompactions", false, new IntField(&rocks_db.max_subcompactions, 2, 0, 16)}, {"rocksdb.delayed_write_rate", false, new Int64Field(&rocks_db.delayed_write_rate, 0, 0, INT64_MAX)}, {"rocksdb.wal_compression", true, new EnumField<rocksdb::CompressionType>(&rocks_db.wal_compression, wal_compression_types, rocksdb::CompressionType::kNoCompression)}, {"rocksdb.wal_ttl_seconds", true, new IntField(&rocks_db.wal_ttl_seconds, 3 * 3600, 0, INT_MAX)}, {"rocksdb.wal_size_limit_mb", true, new IntField(&rocks_db.wal_size_limit_mb, 16384, 0, INT_MAX)}, {"rocksdb.dump_malloc_stats", true, new YesNoField(&rocks_db.dump_malloc_stats, true)}, {"rocksdb.max_total_wal_size", false, new IntField(&rocks_db.max_total_wal_size, 64 * 4 * 2, 0, INT_MAX)}, {"rocksdb.disable_auto_compactions", false, new YesNoField(&rocks_db.disable_auto_compactions, false)}, {"rocksdb.enable_pipelined_write", true, new YesNoField(&rocks_db.enable_pipelined_write, false)}, {"rocksdb.stats_dump_period_sec", false, new IntField(&rocks_db.stats_dump_period_sec, 0, 0, INT_MAX)}, {"rocksdb.cache_index_and_filter_blocks", true, new YesNoField(&rocks_db.cache_index_and_filter_blocks, true)}, {"rocksdb.block_cache_size", true, new IntField(&rocks_db.block_cache_size, 0, 0, INT_MAX)}, {"rocksdb.block_cache_type", true, new EnumField<BlockCacheType>(&rocks_db.block_cache_type, cache_types, BlockCacheType::kCacheTypeLRU)}, {"rocksdb.subkey_block_cache_size", true, new IntField(&rocks_db.subkey_block_cache_size, 2048, 0, INT_MAX)}, {"rocksdb.metadata_block_cache_size", true, new IntField(&rocks_db.metadata_block_cache_size, 2048, 0, INT_MAX)}, {"rocksdb.share_metadata_and_subkey_block_cache", true, new YesNoField(&rocks_db.share_metadata_and_subkey_block_cache, true)}, {"rocksdb.row_cache_size", true, new IntField(&rocks_db.row_cache_size, 0, 0, INT_MAX)}, {"rocksdb.compaction_readahead_size", false, new IntField(&rocks_db.compaction_readahead_size, 2 * MiB, 0, 64 * MiB)}, {"rocksdb.level0_slowdown_writes_trigger", false, new IntField(&rocks_db.level0_slowdown_writes_trigger, 20, 1, 1024)}, {"rocksdb.level0_stop_writes_trigger", false, new IntField(&rocks_db.level0_stop_writes_trigger, 40, 1, 1024)}, {"rocksdb.level0_file_num_compaction_trigger", false, new IntField(&rocks_db.level0_file_num_compaction_trigger, 4, 1, 1024)}, {"rocksdb.enable_blob_files", false, new YesNoField(&rocks_db.enable_blob_files, false)}, {"rocksdb.min_blob_size", false, new IntField(&rocks_db.min_blob_size, 4096, 0, INT_MAX)}, {"rocksdb.blob_file_size", false, new IntField(&rocks_db.blob_file_size, 268435456, 0, INT_MAX)}, {"rocksdb.enable_blob_garbage_collection", false, new YesNoField(&rocks_db.enable_blob_garbage_collection, true)}, {"rocksdb.blob_garbage_collection_age_cutoff", false, new IntField(&rocks_db.blob_garbage_collection_age_cutoff, 25, 0, 100)}, {"rocksdb.max_bytes_for_level_base", false, new IntField(&rocks_db.max_bytes_for_level_base, 268435456, 0, INT_MAX)}, {"rocksdb.max_bytes_for_level_multiplier", false, new IntField(&rocks_db.max_bytes_for_level_multiplier, 10, 1, 100)}, {"rocksdb.level_compaction_dynamic_level_bytes", false, new YesNoField(&rocks_db.level_compaction_dynamic_level_bytes, false)}, {"rocksdb.max_background_jobs", false, new IntField(&rocks_db.max_background_jobs, 4, 0, 32)}, {"rocksdb.rate_limiter_auto_tuned", true, new YesNoField(&rocks_db.rate_limiter_auto_tuned, true)}, {"rocksdb.avoid_unnecessary_blocking_io", true, new YesNoField(&rocks_db.avoid_unnecessary_blocking_io, true)}, {"rocksdb.partition_filters", true, new YesNoField(&rocks_db.partition_filters, true)}, {"rocksdb.max_compaction_bytes", false, new Int64Field(&rocks_db.max_compaction_bytes, 0, 0, INT64_MAX)}, /* rocksdb write options */ {"rocksdb.write_options.sync", true, new YesNoField(&rocks_db.write_options.sync, false)}, {"rocksdb.write_options.disable_wal", true, new YesNoField(&rocks_db.write_options.disable_wal, false)}, {"rocksdb.write_options.no_slowdown", true, new YesNoField(&rocks_db.write_options.no_slowdown, false)}, {"rocksdb.write_options.low_pri", true, new YesNoField(&rocks_db.write_options.low_pri, false)}, {"rocksdb.write_options.memtable_insert_hint_per_batch", true, new YesNoField(&rocks_db.write_options.memtable_insert_hint_per_batch, false)}, {"rocksdb.write_options.write_batch_max_bytes", false, new IntField(&rocks_db.write_options.write_batch_max_bytes, 0, 0, INT_MAX)}, /* rocksdb read options */ {"rocksdb.read_options.async_io", false, new YesNoField(&rocks_db.read_options.async_io, true)}, }; for (auto &wrapper : fields) { auto &field = wrapper.field; field->readonly = wrapper.readonly; fields_.emplace(std::move(wrapper.name), std::move(field)); } initFieldValidator(); initFieldCallback(); } // The validate function would be invoked before the field was set, // to make sure that new value is valid. void Config::initFieldValidator() { std::map<std::string, ValidateFn> validators = { {"requirepass", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (v.empty() && !load_tokens.empty()) { return {Status::NotOK, "requirepass empty not allowed while the namespace exists"}; } if (load_tokens.find(v) != load_tokens.end()) { return {Status::NotOK, "requirepass is duplicated with namespace tokens"}; } return Status::OK(); }}, {"masterauth", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (load_tokens.find(v) != load_tokens.end()) { return {Status::NotOK, "masterauth is duplicated with namespace tokens"}; } return Status::OK(); }}, {"compact-cron", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> args = util::Split(v, " \t"); return compact_cron.SetScheduleTime(args); }}, {"bgsave-cron", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> args = util::Split(v, " \t"); return bgsave_cron.SetScheduleTime(args); }}, {"dbsize-scan-cron", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> args = util::Split(v, " \t"); return dbsize_scan_cron.SetScheduleTime(args); }}, {"compaction-checker-range", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (!compaction_checker_cron_str_.empty()) { return {Status::NotOK, "compaction-checker-range cannot be set while compaction-checker-cron is set"}; } if (v.empty()) { compaction_checker_cron.Clear(); return Status::OK(); } return compaction_checker_cron.SetScheduleTime({"*", v, "*", "*", "*"}); }}, {"compaction-checker-cron", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> args = util::Split(v, " \t"); return compaction_checker_cron.SetScheduleTime(args); }}, {"rename-command", []([[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> all_args = util::Split(v, "\n"); for (auto &p : all_args) { std::vector<std::string> args = util::Split(p, " \t"); if (args.size() != 2) { return {Status::NotOK, "Invalid rename-command format"}; } auto commands = redis::CommandTable::Get(); auto cmd_iter = commands->find(util::ToLower(args[0])); if (cmd_iter == commands->end()) { return {Status::NotOK, "No such command in rename-command"}; } if (args[1] != "\"\"") { auto new_command_name = util::ToLower(args[1]); if (commands->find(new_command_name) != commands->end()) { return {Status::NotOK, "Target command name already exists"}; } (*commands)[new_command_name] = cmd_iter->second; } commands->erase(cmd_iter); } return Status::OK(); }}, }; for (const auto &iter : validators) { auto field_iter = fields_.find(iter.first); if (field_iter != fields_.end()) { field_iter->second->validate = iter.second; } } } // The callback function would be invoked after the field was set, // it may change related fields or re-format the field. for example, // when the 'dir' was set, the db-dir or backup-dir should be reset as well. void Config::initFieldCallback() { auto set_db_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); // srv is nullptr when load config from file return srv->storage->SetDBOption(TrimRocksDbPrefix(k), v); }; auto set_cf_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); // srv is nullptr when load config from file return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); }; auto set_compression_type_cb = [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression, srv->GetConfig()->rocks_db.compression_start_level); }; auto set_compression_start_level_cb = [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression, srv->GetConfig()->rocks_db.compression_start_level); }; #ifdef ENABLE_OPENSSL auto set_tls_option = [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) { if (!srv) return Status::OK(); // srv is nullptr when load config from file auto new_ctx = CreateSSLContext(srv->GetConfig()); if (!new_ctx) { return Status(Status::NotOK, "Failed to configure SSL context, check server log for more details"); } srv->ssl_ctx = std::move(new_ctx); return Status::OK(); }; #endif auto replicaof_cb = [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (v.empty()) { return Status::OK(); } std::vector<std::string> args = util::Split(v, " \t"); if (args.size() != 2) return {Status::NotOK, "wrong number of arguments"}; if (args[0] != "no" && args[1] != "one") { master_host = args[0]; auto parse_result = ParseInt<int>(args[1], NumericRange<int>{1, PORT_LIMIT - 1}, 10); if (!parse_result) { return {Status::NotOK, "should be between 0 and 65535"}; } master_port = *parse_result; } return Status::OK(); }; std::map<std::string, CallbackFn> callbacks = { {"workers", [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->AdjustWorkerThreads(); return Status::OK(); }}, {"dir", [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { db_dir = dir + "/db"; if (log_dir.empty()) log_dir = dir + ",stdout"; checkpoint_dir = dir + "/checkpoint"; sync_checkpoint_dir = dir + "/sync_checkpoint"; backup_sync_dir = dir + "/backup_for_sync"; if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup"; if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid"; return Status::OK(); }}, {"backup-dir", [this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::string previous_backup; { // Note: currently, backup_mu_ may block by backing up or purging, // the command may wait for seconds. std::lock_guard<std::mutex> lg(this->backup_mu); previous_backup = std::move(backup_dir); backup_dir = v; } if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) { // info() should be called after log is initialized and server is loaded. info("change backup dir from {} to {}", previous_backup, v); } return Status::OK(); }}, {"cluster-enabled", [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (cluster_enabled) slot_id_encoded = true; return Status::OK(); }}, {"bind", [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> args = util::Split(v, " \t"); binds = std::move(args); return Status::OK(); }}, {"maxclients", [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->AdjustOpenFilesLimit(); return Status::OK(); }}, {"slaveof", replicaof_cb}, {"replicaof", replicaof_cb}, {"profiling-sample-commands", [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> cmds = util::Split(v, ","); profiling_sample_all_commands = false; profiling_sample_commands.clear(); for (auto const &cmd : cmds) { if (cmd == "*") { profiling_sample_all_commands = true; profiling_sample_commands.clear(); return Status::OK(); } if (!redis::CommandTable::IsExists(cmd)) { return {Status::NotOK, cmd + " is not Kvrocks supported command"}; } // profiling_sample_commands use command's original name, regardless of rename-command directive profiling_sample_commands.insert(cmd); } return Status::OK(); }}, {"slowlog-max-len", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->GetSlowLog()->SetMaxEntries(slowlog_max_len); return Status::OK(); }}, {"max-db-size", [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->storage->CheckDBSizeLimit(); return Status::OK(); }}, {"max-io-mb", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->storage->SetIORateLimit(max_io_mb); return Status::OK(); }}, {"profiling-sample-record-max-len", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len); return Status::OK(); }}, {"migrate-speed", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); if (cluster_enabled) srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed); return Status::OK(); }}, {"migrate-pipeline-size", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); if (cluster_enabled) srv->slot_migrator->SetMaxPipelineSize(pipeline_size); return Status::OK(); }}, {"migrate-sequence-gap", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); if (cluster_enabled) srv->slot_migrator->SetSequenceGapLimit(sequence_gap); return Status::OK(); }}, {"migrate-batch-rate-limit-mb", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB); return Status::OK(); }}, {"migrate-batch-size-kb", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB); return Status::OK(); }}, {"log-level", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); spdlog::set_level(log_level); return Status::OK(); }}, {"persist-cluster-nodes-enabled", [this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (!srv || !cluster_enabled) return Status::OK(); auto nodes_file_path = NodesFilePath(); if (v == "yes") { return srv->cluster->DumpClusterNodes(nodes_file_path); } // Remove the cluster nodes file to avoid stale cluster nodes info remove(nodes_file_path.data()); return Status::OK(); }}, {"repl-namespace-enabled", [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return srv->GetNamespace()->LoadAndRewrite(); }}, {"rocksdb.target_file_size_base", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(rocks_db.target_file_size_base * MiB)); }}, {"rocksdb.write_buffer_size", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(rocks_db.write_buffer_size * MiB)); }}, {"rocksdb.disable_auto_compactions", [](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); std::string disable_auto_compactions = v == "yes" ? "true" : "false"; return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), disable_auto_compactions); }}, {"rocksdb.max_total_wal_size", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); return srv->storage->SetDBOption(TrimRocksDbPrefix(k), std::to_string(rocks_db.max_total_wal_size * MiB)); }}, {"rocksdb.enable_blob_files", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); std::string enable_blob_files = rocks_db.enable_blob_files ? "true" : "false"; return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_files); }}, {"rocksdb.min_blob_size", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.enable_blob_files) { return {Status::NotOK, errBlobDbNotEnabled}; } return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); }}, {"rocksdb.blob_file_size", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.enable_blob_files) { return {Status::NotOK, errBlobDbNotEnabled}; } return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(rocks_db.blob_file_size)); }}, {"rocksdb.enable_blob_garbage_collection", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.enable_blob_files) { return {Status::NotOK, errBlobDbNotEnabled}; } std::string enable_blob_garbage_collection = v == "yes" ? "true" : "false"; return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_garbage_collection); }}, {"rocksdb.blob_garbage_collection_age_cutoff", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.enable_blob_files) { return {Status::NotOK, errBlobDbNotEnabled}; } int val = 0; auto parse_result = ParseInt<int>(v, 10); if (!parse_result) { return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff value."}; } val = *parse_result; if (val < 0 || val > 100) { return {Status::NotOK, "blob_garbage_collection_age_cutoff must >= 0 and <= 100."}; } double cutoff = val / 100.0; return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(cutoff)); }}, {"rocksdb.level_compaction_dynamic_level_bytes", [](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); std::string level_compaction_dynamic_level_bytes = v == "yes" ? "true" : "false"; return srv->storage->SetDBOption(TrimRocksDbPrefix(k), level_compaction_dynamic_level_bytes); }}, {"rocksdb.max_bytes_for_level_base", [this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.level_compaction_dynamic_level_bytes) { return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; } return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(rocks_db.max_bytes_for_level_base)); }}, {"rocksdb.max_bytes_for_level_multiplier", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); if (!rocks_db.level_compaction_dynamic_level_bytes) { return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet}; } return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v); }}, {"rocksdb.max_open_files", set_db_option_cb}, {"rocksdb.stats_dump_period_sec", set_db_option_cb}, {"rocksdb.delayed_write_rate", set_db_option_cb}, {"rocksdb.max_background_compactions", set_db_option_cb}, {"rocksdb.max_background_flushes", set_db_option_cb}, {"rocksdb.max_subcompactions", set_db_option_cb}, {"rocksdb.compaction_readahead_size", set_db_option_cb}, {"rocksdb.max_background_jobs", set_db_option_cb}, {"rocksdb.max_compaction_bytes", set_cf_option_cb}, {"rocksdb.max_write_buffer_number", set_cf_option_cb}, {"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb}, {"rocksdb.level0_stop_writes_trigger", set_cf_option_cb}, {"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb}, {"rocksdb.compression", set_compression_type_cb}, {"rocksdb.compression_start_level", set_compression_start_level_cb}, #ifdef ENABLE_OPENSSL {"tls-cert-file", set_tls_option}, {"tls-key-file", set_tls_option}, {"tls-key-file-pass", set_tls_option}, {"tls-ca-cert-file", set_tls_option}, {"tls-ca-cert-dir", set_tls_option}, {"tls-protocols", set_tls_option}, {"tls-auth-clients", set_tls_option}, {"tls-ciphers", set_tls_option}, {"tls-ciphersuites", set_tls_option}, {"tls-prefer-server-ciphers", set_tls_option}, {"tls-session-caching", set_tls_option}, {"tls-session-cache-size", set_tls_option}, {"tls-session-cache-timeout", set_tls_option}, #endif {"histogram-bucket-boundaries", [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status { std::vector<std::string> buckets = util::Split(v, ","); histogram_bucket_boundaries.clear(); if (buckets.size() < 1) { return Status::OK(); } for (const auto &bucket_val : buckets) { auto parse_result = ParseFloat<double>(bucket_val); if (!parse_result) { return {Status::NotOK, "The values in the bucket list must be double or integer."}; } histogram_bucket_boundaries.push_back(*parse_result); } if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) { return {Status::NotOK, "The values for the histogram must be sorted."}; } return Status::OK(); }}, }; for (const auto &iter : callbacks) { auto field_iter = fields_.find(iter.first); if (field_iter != fields_.end()) { field_iter->second->callback = iter.second; } } } std::string Config::NodesFilePath() const { return dir + "/nodes.conf"; } void Config::SetMaster(const std::string &host, uint32_t port) { master_host = host; master_port = port; auto iter = fields_.find("slaveof"); if (iter != fields_.end()) { auto s = iter->second->Set(master_host + " " + std::to_string(master_port)); if (!s.IsOK()) { error("Failed to set the value of 'slaveof' setting: {}", s.Msg()); } } } void Config::ClearMaster() { master_host.clear(); master_port = 0; auto iter = fields_.find("slaveof"); if (iter != fields_.end()) { auto s = iter->second->Set("no one"); if (!s.IsOK()) { error("Failed to clear the value of 'slaveof' setting: {}", s.Msg()); } } } Status Config::parseConfigFromPair(const std::pair<std::string, std::string> &input, int line_number) { std::string field_key = util::ToLower(input.first); constexpr std::string_view ns_str = "namespace."; if (util::StartsWithICase(input.first, ns_str)) { // namespace should keep key case-sensitive field_key = input.first; load_tokens[input.second] = input.first.substr(ns_str.size()); return Status::OK(); } auto iter = fields_.find(field_key); if (iter != fields_.end()) { auto &field = iter->second; field->line_number = line_number; auto s = field->Set(input.second); if (!s.IsOK()) return s.Prefixed(fmt::format("failed to set value of field '{}'", field_key)); } else { std::cout << fmt::format("WARNING: '{}' at line {} is not a valid configuration key.", field_key, line_number) << std::endl; } return Status::OK(); } Status Config::parseConfigFromString(const std::string &input, int line_number) { auto parsed = ParseConfigLine(input); if (!parsed) return parsed.ToStatus().Prefixed("malformed line"); auto kv = std::move(*parsed); if (kv.first.empty() || kv.second.empty()) return Status::OK(); return parseConfigFromPair(kv, line_number); } Status Config::finish() { if (requirepass.empty() && !load_tokens.empty()) { return {Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"}; } if ((cluster_enabled) && !load_tokens.empty()) { return {Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"}; } if (unixsocket.empty() && binds.size() == 0) { binds.emplace_back(kDefaultBindAddress); } if (cluster_enabled && binds.size() == 0) { return {Status::NotOK, "node is in cluster mode, but TCP listen address " "wasn't specified via configuration file"}; } if (master_port != 0 && binds.size() == 0) { return {Status::NotOK, "replication doesn't support unix socket"}; } if (db_dir.empty()) db_dir = dir + "/db"; if (log_dir.empty()) log_dir = dir; std::vector<std::string> create_dirs = {dir}; for (const auto &name : create_dirs) { auto s = rocksdb::Env::Default()->CreateDirIfMissing(name); if (!s.ok()) return {Status::NotOK, s.ToString()}; } return Status::OK(); } Status Config::Load(const CLIOptions &opts) { if (!opts.conf_file.empty()) { std::ifstream file; std::istream *in = nullptr; if (opts.conf_file == "-") { in = &std::cin; } else { path_ = opts.conf_file; file.open(path_); if (!file.is_open()) { return {Status::NotOK, fmt::format("failed to open file '{}': {}", path_, strerror(errno))}; } in = &file; } std::string line; int line_num = 1; while (in->good() && std::getline(*in, line)) { if (auto s = parseConfigFromString(line, line_num); !s.IsOK()) { return s.Prefixed(fmt::format("at line #L{}", line_num)); } line_num++; } } else { std::cout << "WARNING: No config file specified, default configuration applied. " << "In order to specify a config file, use `kvrocks -c /path/to/kvrocks.conf`." << std::endl; } for (const auto &opt : opts.cli_options) { GET_OR_RET(parseConfigFromPair(opt, -1).Prefixed("CLI config option error")); } for (const auto &iter : fields_) { // line_number = 0 means the user didn't specify the field value // on config file and would use default value, so won't validate here. if (iter.second->line_number != 0 && iter.second->validate) { auto s = iter.second->validate(iter.first, iter.second->ToString()); if (!s.IsOK()) { return s.Prefixed(fmt::format("at line #L{}: {} is invalid", iter.second->line_number, iter.first)); } } } for (const auto &iter : fields_) { if (iter.second->callback) { auto s = iter.second->callback(nullptr, iter.first, iter.second->ToString()); if (!s.IsOK()) { return s.Prefixed(fmt::format("while changing key '{}'", iter.first)); } } } return finish(); } void Config::Get(const std::string &key, std::vector<std::string> *values) const { values->clear(); for (const auto &iter : fields_) { if (util::StringMatch(key, iter.first, true)) { if (iter.second->IsMultiConfig()) { for (const auto &p : util::Split(iter.second->ToString(), "\n")) { values->emplace_back(iter.first); values->emplace_back(p); } } else { values->emplace_back(iter.first); values->emplace_back(iter.second->ToString()); } } } } Status Config::Set(Server *srv, std::string key, const std::string &value) { key = util::ToLower(key); auto iter = fields_.find(key); if (iter == fields_.end() || iter->second->readonly) { return {Status::NotOK, "Unsupported CONFIG parameter: " + key}; } auto &field = iter->second; if (field->validate) { auto s = field->validate(key, value); if (!s.IsOK()) return s.Prefixed("invalid value"); } auto origin_value = field->ToStringForRewrite(); auto s = field->Set(value); if (!s.IsOK()) return s.Prefixed("failed to set new value"); if (field->callback) { s = field->callback(srv, key, value); if (!s.IsOK()) { // rollback the value if the callback failed auto set_status = field->Set(origin_value); if (!set_status.IsOK()) { return set_status.Prefixed("failed to rollback the value"); } } return s; } return Status::OK(); } bool Config::checkFieldValueIsDefault(const std::string &key, const std::string &value) const { auto iter = fields_.find(key); return iter != fields_.end() && iter->second->Default() == value; } Status Config::Rewrite(const std::map<std::string, std::string> &tokens) { if (!HasConfigFile()) { return {Status::NotOK, "the server is running without a config file"}; } std::vector<std::string> lines; std::map<std::string, std::string> new_config; for (const auto &iter : fields_) { if (iter.second->IsMultiConfig()) { // We should NOT overwrite the commands which are MultiConfig since it cannot be rewritten in-flight, // so skip it here to avoid rewriting it as new item. continue; } new_config[iter.first] = iter.second->ToStringForRewrite(); } std::string namespace_prefix = "namespace."; if (!repl_namespace_enabled) { // need to rewrite to the configuration if we don't replicate namespaces for (const auto &iter : tokens) { new_config[namespace_prefix + iter.second] = iter.first; } } std::ifstream file(path_); if (file.is_open()) { std::string raw_line; while (file.good() && std::getline(file, raw_line)) { auto parsed = ParseConfigLine(raw_line); if (!parsed || parsed->first.empty()) { lines.emplace_back(raw_line); continue; } auto kv = std::move(*parsed); if (util::StartsWith(kv.first, namespace_prefix)) { // Ignore namespace fields here since we would always rewrite them continue; } auto iter = new_config.find(util::ToLower(kv.first)); if (iter != new_config.end()) { if (!iter->second.empty()) lines.emplace_back(DumpConfigLine({iter->first, iter->second})); new_config.erase(iter); } else { lines.emplace_back(raw_line); } } } file.close(); std::string out_buf; for (const auto &line : lines) { fmt::format_to(std::back_inserter(out_buf), "{}\n", line); } for (const auto &remain : new_config) { if (remain.second.empty() || checkFieldValueIsDefault(remain.first, remain.second)) continue; fmt::format_to(std::back_inserter(out_buf), "{}\n", DumpConfigLine({remain.first, remain.second})); } std::string tmp_path = path_ + ".tmp"; remove(tmp_path.data()); std::ofstream output_file(tmp_path, std::ios::out); output_file << out_buf; output_file.close(); if (rename(tmp_path.data(), path_.data()) < 0) { return {Status::NotOK, fmt::format("rename file encounter error: {}", strerror(errno))}; } return Status::OK(); }