in src/config/config.cc [409:767]
void Config::initFieldCallback() {
auto set_db_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK(); // srv is nullptr when load config from file
return srv->storage->SetDBOption(TrimRocksDbPrefix(k), v);
};
auto set_cf_option_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK(); // srv is nullptr when load config from file
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
};
auto set_compression_type_cb = [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression,
srv->GetConfig()->rocks_db.compression_start_level);
};
auto set_compression_start_level_cb = [](Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return SetRocksdbCompression(srv, srv->GetConfig()->rocks_db.compression,
srv->GetConfig()->rocks_db.compression_start_level);
};
#ifdef ENABLE_OPENSSL
auto set_tls_option = [](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) {
if (!srv) return Status::OK(); // srv is nullptr when load config from file
auto new_ctx = CreateSSLContext(srv->GetConfig());
if (!new_ctx) {
return Status(Status::NotOK, "Failed to configure SSL context, check server log for more details");
}
srv->ssl_ctx = std::move(new_ctx);
return Status::OK();
};
#endif
auto replicaof_cb = [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k,
const std::string &v) -> Status {
if (v.empty()) {
return Status::OK();
}
std::vector<std::string> args = util::Split(v, " \t");
if (args.size() != 2) return {Status::NotOK, "wrong number of arguments"};
if (args[0] != "no" && args[1] != "one") {
master_host = args[0];
auto parse_result = ParseInt<int>(args[1], NumericRange<int>{1, PORT_LIMIT - 1}, 10);
if (!parse_result) {
return {Status::NotOK, "should be between 0 and 65535"};
}
master_port = *parse_result;
}
return Status::OK();
};
std::map<std::string, CallbackFn> callbacks = {
{"workers",
[](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->AdjustWorkerThreads();
return Status::OK();
}},
{"dir",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
db_dir = dir + "/db";
if (log_dir.empty()) log_dir = dir + ",stdout";
checkpoint_dir = dir + "/checkpoint";
sync_checkpoint_dir = dir + "/sync_checkpoint";
backup_sync_dir = dir + "/backup_for_sync";
if (backup_dir == kDefaultBackupDir) backup_dir = dir + "/backup";
if (pidfile == kDefaultPidfile) pidfile = dir + "/kvrocks.pid";
return Status::OK();
}},
{"backup-dir",
[this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::string previous_backup;
{
// Note: currently, backup_mu_ may block by backing up or purging,
// the command may wait for seconds.
std::lock_guard<std::mutex> lg(this->backup_mu);
previous_backup = std::move(backup_dir);
backup_dir = v;
}
if (!previous_backup.empty() && srv != nullptr && !srv->IsLoading()) {
// info() should be called after log is initialized and server is loaded.
info("change backup dir from {} to {}", previous_backup, v);
}
return Status::OK();
}},
{"cluster-enabled",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
if (cluster_enabled) slot_id_encoded = true;
return Status::OK();
}},
{"bind",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> args = util::Split(v, " \t");
binds = std::move(args);
return Status::OK();
}},
{"maxclients",
[](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->AdjustOpenFilesLimit();
return Status::OK();
}},
{"slaveof", replicaof_cb},
{"replicaof", replicaof_cb},
{"profiling-sample-commands",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> cmds = util::Split(v, ",");
profiling_sample_all_commands = false;
profiling_sample_commands.clear();
for (auto const &cmd : cmds) {
if (cmd == "*") {
profiling_sample_all_commands = true;
profiling_sample_commands.clear();
return Status::OK();
}
if (!redis::CommandTable::IsExists(cmd)) {
return {Status::NotOK, cmd + " is not Kvrocks supported command"};
}
// profiling_sample_commands use command's original name, regardless of rename-command directive
profiling_sample_commands.insert(cmd);
}
return Status::OK();
}},
{"slowlog-max-len",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->GetSlowLog()->SetMaxEntries(slowlog_max_len);
return Status::OK();
}},
{"max-db-size",
[](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->storage->CheckDBSizeLimit();
return Status::OK();
}},
{"max-io-mb",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->storage->SetIORateLimit(max_io_mb);
return Status::OK();
}},
{"profiling-sample-record-max-len",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->GetPerfLog()->SetMaxEntries(profiling_sample_record_max_len);
return Status::OK();
}},
{"migrate-speed",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
if (cluster_enabled) srv->slot_migrator->SetMaxMigrationSpeed(migrate_speed);
return Status::OK();
}},
{"migrate-pipeline-size",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
if (cluster_enabled) srv->slot_migrator->SetMaxPipelineSize(pipeline_size);
return Status::OK();
}},
{"migrate-sequence-gap",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
if (cluster_enabled) srv->slot_migrator->SetSequenceGapLimit(sequence_gap);
return Status::OK();
}},
{"migrate-batch-rate-limit-mb",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB);
return Status::OK();
}},
{"migrate-batch-size-kb",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB);
return Status::OK();
}},
{"log-level",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
spdlog::set_level(log_level);
return Status::OK();
}},
{"persist-cluster-nodes-enabled",
[this](Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
if (!srv || !cluster_enabled) return Status::OK();
auto nodes_file_path = NodesFilePath();
if (v == "yes") {
return srv->cluster->DumpClusterNodes(nodes_file_path);
}
// Remove the cluster nodes file to avoid stale cluster nodes info
remove(nodes_file_path.data());
return Status::OK();
}},
{"repl-namespace-enabled",
[](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->GetNamespace()->LoadAndRewrite();
}},
{"rocksdb.target_file_size_base",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(rocks_db.target_file_size_base * MiB));
}},
{"rocksdb.write_buffer_size",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(rocks_db.write_buffer_size * MiB));
}},
{"rocksdb.disable_auto_compactions",
[](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
std::string disable_auto_compactions = v == "yes" ? "true" : "false";
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), disable_auto_compactions);
}},
{"rocksdb.max_total_wal_size",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->storage->SetDBOption(TrimRocksDbPrefix(k), std::to_string(rocks_db.max_total_wal_size * MiB));
}},
{"rocksdb.enable_blob_files",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
std::string enable_blob_files = rocks_db.enable_blob_files ? "true" : "false";
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_files);
}},
{"rocksdb.min_blob_size",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.enable_blob_files) {
return {Status::NotOK, errBlobDbNotEnabled};
}
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
}},
{"rocksdb.blob_file_size",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.enable_blob_files) {
return {Status::NotOK, errBlobDbNotEnabled};
}
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(rocks_db.blob_file_size));
}},
{"rocksdb.enable_blob_garbage_collection",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.enable_blob_files) {
return {Status::NotOK, errBlobDbNotEnabled};
}
std::string enable_blob_garbage_collection = v == "yes" ? "true" : "false";
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), enable_blob_garbage_collection);
}},
{"rocksdb.blob_garbage_collection_age_cutoff",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.enable_blob_files) {
return {Status::NotOK, errBlobDbNotEnabled};
}
int val = 0;
auto parse_result = ParseInt<int>(v, 10);
if (!parse_result) {
return {Status::NotOK, "Illegal blob_garbage_collection_age_cutoff value."};
}
val = *parse_result;
if (val < 0 || val > 100) {
return {Status::NotOK, "blob_garbage_collection_age_cutoff must >= 0 and <= 100."};
}
double cutoff = val / 100.0;
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), std::to_string(cutoff));
}},
{"rocksdb.level_compaction_dynamic_level_bytes",
[](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
std::string level_compaction_dynamic_level_bytes = v == "yes" ? "true" : "false";
return srv->storage->SetDBOption(TrimRocksDbPrefix(k), level_compaction_dynamic_level_bytes);
}},
{"rocksdb.max_bytes_for_level_base",
[this](Server *srv, const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.level_compaction_dynamic_level_bytes) {
return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
}
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k),
std::to_string(rocks_db.max_bytes_for_level_base));
}},
{"rocksdb.max_bytes_for_level_multiplier",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
if (!rocks_db.level_compaction_dynamic_level_bytes) {
return {Status::NotOK, errLevelCompactionDynamicLevelBytesNotSet};
}
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
}},
{"rocksdb.max_open_files", set_db_option_cb},
{"rocksdb.stats_dump_period_sec", set_db_option_cb},
{"rocksdb.delayed_write_rate", set_db_option_cb},
{"rocksdb.max_background_compactions", set_db_option_cb},
{"rocksdb.max_background_flushes", set_db_option_cb},
{"rocksdb.max_subcompactions", set_db_option_cb},
{"rocksdb.compaction_readahead_size", set_db_option_cb},
{"rocksdb.max_background_jobs", set_db_option_cb},
{"rocksdb.max_compaction_bytes", set_cf_option_cb},
{"rocksdb.max_write_buffer_number", set_cf_option_cb},
{"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb},
{"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
{"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
{"rocksdb.compression", set_compression_type_cb},
{"rocksdb.compression_start_level", set_compression_start_level_cb},
#ifdef ENABLE_OPENSSL
{"tls-cert-file", set_tls_option},
{"tls-key-file", set_tls_option},
{"tls-key-file-pass", set_tls_option},
{"tls-ca-cert-file", set_tls_option},
{"tls-ca-cert-dir", set_tls_option},
{"tls-protocols", set_tls_option},
{"tls-auth-clients", set_tls_option},
{"tls-ciphers", set_tls_option},
{"tls-ciphersuites", set_tls_option},
{"tls-prefer-server-ciphers", set_tls_option},
{"tls-session-caching", set_tls_option},
{"tls-session-cache-size", set_tls_option},
{"tls-session-cache-timeout", set_tls_option},
#endif
{"histogram-bucket-boundaries",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> buckets = util::Split(v, ",");
histogram_bucket_boundaries.clear();
if (buckets.size() < 1) {
return Status::OK();
}
for (const auto &bucket_val : buckets) {
auto parse_result = ParseFloat<double>(bucket_val);
if (!parse_result) {
return {Status::NotOK, "The values in the bucket list must be double or integer."};
}
histogram_bucket_boundaries.push_back(*parse_result);
}
if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) {
return {Status::NotOK, "The values for the histogram must be sorted."};
}
return Status::OK();
}},
};
for (const auto &iter : callbacks) {
auto field_iter = fields_.find(iter.first);
if (field_iter != fields_.end()) {
field_iter->second->callback = iter.second;
}
}
}