src/server/pegasus_server_impl.h (379 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <gtest/gtest_prod.h>
#include <rocksdb/compression_type.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/table.h>
#include <rrdb/rrdb_types.h>
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "bulk_load_types.h"
#include "common/gpid.h"
#include "metadata_types.h"
#include "pegasus_manual_compact_service.h"
#include "pegasus_read_service.h"
#include "pegasus_scan_context.h"
#include "pegasus_utils.h"
#include "pegasus_value_schema.h"
#include "range_read_limiter.h"
#include "replica/replication_app_base.h"
#include "task/task.h"
#include "task/task_tracker.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/metrics.h"
#include "utils/rand.h"
#include "utils/synchronize.h"
DSN_DECLARE_uint64(rocksdb_abnormal_batch_get_bytes_threshold);
DSN_DECLARE_uint64(rocksdb_abnormal_batch_get_count_threshold);
DSN_DECLARE_uint64(rocksdb_abnormal_get_size_threshold);
DSN_DECLARE_uint64(rocksdb_abnormal_multi_get_iterate_count_threshold);
DSN_DECLARE_uint64(rocksdb_abnormal_multi_get_size_threshold);
namespace pegasus {
namespace server {
class KeyWithTTLCompactionFilterFactory;
} // namespace server
} // namespace pegasus
namespace rocksdb {
class Cache;
class ColumnFamilyHandle;
class DB;
class RateLimiter;
class Statistics;
class WriteBufferManager;
} // namespace rocksdb
namespace dsn {
class blob;
class message_ex;
class rpc_address;
namespace replication {
class detect_hotkey_request;
class detect_hotkey_response;
class learn_state;
class replica;
} // namespace replication
namespace utils {
class token_bucket_throttling_controller;
} // namespace utils
} // namespace dsn
typedef dsn::utils::token_bucket_throttling_controller throttling_controller;
namespace pegasus {
namespace server {
class capacity_unit_calculator;
class hotkey_collector;
class meta_store;
class pegasus_server_write;
enum class range_iteration_state
{
kNormal = 1,
kExpired,
kFiltered,
kHashInvalid
};
class pegasus_server_impl : public pegasus_read_service
{
public:
static void register_service()
{
replication_app_base::register_storage_engine(
"pegasus", replication_app_base::create<pegasus::server::pegasus_server_impl>);
register_rpc_handlers();
}
explicit pegasus_server_impl(dsn::replication::replica *r);
~pegasus_server_impl() override;
// the following methods may set physical error if internal error occurs
void on_get(get_rpc rpc) override;
void on_multi_get(multi_get_rpc rpc) override;
void on_batch_get(batch_get_rpc rpc) override;
void on_sortkey_count(sortkey_count_rpc rpc) override;
void on_ttl(ttl_rpc rpc) override;
void on_get_scanner(get_scanner_rpc rpc) override;
void on_scan(scan_rpc rpc) override;
void on_clear_scanner(const int64_t &args) override;
// input:
// - argc = 0 : re-open the db
// - argc = 2n + 1, n >= 0; normal open the db
// returns:
// - ERR_OK
// - ERR_FILE_OPERATION_FAILED
// - ERR_LOCAL_APP_FAILURE
::dsn::error_code start(int argc, char **argv) override;
void cancel_background_work(bool wait);
// returns:
// - ERR_OK
// - ERR_FILE_OPERATION_FAILED
::dsn::error_code stop(bool clear_state) override;
int make_idempotent(dsn::message_ex *request, dsn::message_ex **new_request) override;
/// Each of the write request (specifically, the rpc that's configured as write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will first be
/// replicated to the replicas through the underlying PacificA protocol in rDSN, and
/// after being committed, the mutation will be applied into rocksdb by this function.
///
/// \see dsn::replication::replication_app_base::apply_mutation
/// \inherit dsn::replication::replication_app_base
int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn::message_ex **requests,
int count,
dsn::message_ex *original_request) override;
::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
{
return ::dsn::ERR_OK;
}
// returns:
// - ERR_OK: checkpoint succeed
// - ERR_WRONG_TIMING: another checkpoint is running now
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
// ATTENTION: make sure that no other threads is writing into the replica.
::dsn::error_code sync_checkpoint() override;
// returns:
// - ERR_OK: checkpoint succeed
// - ERR_WRONG_TIMING: another checkpoint is running now
// - ERR_LOCAL_APP_FAILURE: some internal failure
// - ERR_FILE_OPERATION_FAILED: some file failure
// - ERR_TRY_AGAIN: flush memtable triggered, need try again later
::dsn::error_code async_checkpoint(bool flush_memtable) override;
//
// copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint
// copied will be assigned to checkpoint_decree if checkpoint_decree is not null.
// if checkpoint_dir already exist, this function will delete it first.
//
// must be thread safe
// this method will not trigger flush(), just copy even if the app is empty.
::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree,
bool flush_memtable = false) override;
//
// help function, just copy checkpoint to specified dir and ignore _is_checkpointing.
// if checkpoint_dir already exist, this function will delete it first.
::dsn::error_code copy_checkpoint_to_dir_unsafe(const char *checkpoint_dir,
/**output*/ int64_t *checkpoint_decree,
bool flush_memtable = false);
// get the last checkpoint
// if succeed:
// - the checkpoint files path are put into "state.files"
// - the checkpoint_info are serialized into "state.meta"
// - the "state.from_decree_excluded" and "state.to_decree_excluded" are set properly
// returns:
// - ERR_OK
// - ERR_OBJECT_NOT_FOUND
// - ERR_FILE_OPERATION_FAILED
::dsn::error_code get_checkpoint(int64_t learn_start,
const dsn::blob &learn_request,
dsn::replication::learn_state &state) override;
// apply checkpoint, this will clear and recreate the db
// if succeed:
// - last_committed_decree() == last_durable_decree()
// returns:
// - ERR_OK
// - ERR_FILE_OPERATION_FAILED
// - error code of close()
// - error code of open()
// - error code of checkpoint()
::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode,
const dsn::replication::learn_state &state) override;
int64_t last_flushed_decree() const override;
int64_t last_durable_decree() const override { return _last_durable_decree.load(); }
void update_app_envs(const std::map<std::string, std::string> &envs) override;
void query_app_envs(/*out*/ std::map<std::string, std::string> &envs) override;
void set_partition_version(int32_t partition_version) override;
std::string dump_write_request(dsn::message_ex *request) override;
// Not thread-safe
void set_ingestion_status(dsn::replication::ingestion_status::type status) override;
dsn::replication::ingestion_status::type get_ingestion_status() override
{
return _ingestion_status;
}
private:
friend class manual_compact_service_test;
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
friend class hotkey_collector_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
FRIEND_TEST(pegasus_server_impl_test, test_stop_db_twice);
FRIEND_TEST(pegasus_server_impl_test, test_update_user_specified_compaction);
friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
friend class rocksdb_wrapper;
// parse checkpoint directories in the data dir
// checkpoint directory format is: "checkpoint.{decree}"
void parse_checkpoints();
// garbage collection checkpoints
// if force_reserve_one == true, then only reserve the last one checkpoint
void gc_checkpoints(bool force_reserve_one = false);
void set_last_durable_decree(int64_t decree) { _last_durable_decree.store(decree); }
void append_key_value(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
bool no_value,
bool request_expire_ts);
range_iteration_state
validate_key_value_for_scan(const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool request_validate_hash);
range_iteration_state
append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value);
// return true if the filter type is supported
bool is_filter_type_supported(::dsn::apps::filter_type::type filter_type)
{
return filter_type >= ::dsn::apps::filter_type::FT_NO_FILTER &&
filter_type <= ::dsn::apps::filter_type::FT_MATCH_POSTFIX;
}
// return true if the data is valid for the filter
static bool validate_filter(::dsn::apps::filter_type::type filter_type,
const ::dsn::blob &filter_pattern,
const ::dsn::blob &value);
void update_replica_rocksdb_statistics();
static void update_server_rocksdb_statistics();
// get the absolute path of restore directory and the flag whether force restore from env
// return
// std::pair<std::string, bool>, pair.first is the path of the restore dir; pair.second is
// the flag that whether force restore
std::pair<std::string, bool>
get_restore_dir_from_env(const std::map<std::string, std::string> &env_kvs);
void update_app_envs_before_open_db(const std::map<std::string, std::string> &envs);
void update_usage_scenario(const std::map<std::string, std::string> &envs);
void update_default_ttl(const std::map<std::string, std::string> &envs);
void update_checkpoint_reserve(const std::map<std::string, std::string> &envs);
void update_slow_query_threshold(const std::map<std::string, std::string> &envs);
void update_rocksdb_iteration_threshold(const std::map<std::string, std::string> &envs);
void update_rocksdb_block_cache_enabled(const std::map<std::string, std::string> &envs);
void update_validate_partition_hash(const std::map<std::string, std::string> &envs);
void update_user_specified_compaction(const std::map<std::string, std::string> &envs);
void update_rocksdb_dynamic_options(const std::map<std::string, std::string> &envs);
void set_rocksdb_options_before_creating(const std::map<std::string, std::string> &envs);
void update_throttling_controller(const std::map<std::string, std::string> &envs);
bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);
// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
std::vector<rocksdb::CompressionType> &compression_per_level);
bool compression_str_to_type(const std::string &compression_str,
rocksdb::CompressionType &type);
std::string compression_type_to_str(rocksdb::CompressionType type);
// return finish time recorded in rocksdb
uint64_t do_manual_compact(const rocksdb::CompactRangeOptions &options);
// generate new checkpoint and remove old checkpoints, in order to release storage asap
// return true if release succeed (new checkpointed generated).
bool release_storage_after_manual_compact();
std::string query_compact_state() const override;
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);
// recalculate option value if necessary
void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts);
void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);
void reset_allow_ingest_behind_option(const rocksdb::DBOptions &base_db_opt,
const std::map<std::string, std::string> &envs,
rocksdb::DBOptions *target_db_opt);
void reset_rocksdb_options(const rocksdb::ColumnFamilyOptions &base_cf_opts,
const rocksdb::DBOptions &base_db_opt,
const std::map<std::string, std::string> &envs,
rocksdb::ColumnFamilyOptions *target_cf_opts,
rocksdb::DBOptions *target_db_opt);
// return true if successfully set
bool set_options(const std::unordered_map<std::string, std::string> &new_options);
// return random value in range of [0.75,1.25] * base_value
uint64_t get_random_nearby(uint64_t base_value)
{
uint64_t gap = base_value / 4;
return dsn::rand::next_u64(base_value - gap, base_value + gap);
}
// return true if value in range of [0.75, 1.25] * base_value
bool check_value_if_nearby(uint64_t base_value, uint64_t check_value)
{
uint64_t gap = base_value / 4;
uint64_t actual_gap = (base_value < check_value) ? check_value - base_value
: base_value - check_value;
return actual_gap <= gap;
}
// return true if expired
bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value)
{
return pegasus::check_if_record_expired(
_pegasus_data_version, epoch_now, utils::to_string_view(raw_value));
}
bool is_multi_get_abnormal(uint64_t time_used, uint64_t size, uint64_t iterate_count)
{
if (FLAGS_rocksdb_abnormal_multi_get_size_threshold > 0 &&
size >= FLAGS_rocksdb_abnormal_multi_get_size_threshold) {
return true;
}
if (FLAGS_rocksdb_abnormal_multi_get_iterate_count_threshold > 0 &&
iterate_count >= FLAGS_rocksdb_abnormal_multi_get_iterate_count_threshold) {
return true;
}
if (time_used >= _slow_query_threshold_ns) {
return true;
}
return false;
}
bool is_batch_get_abnormal(uint64_t time_used, uint64_t size, uint64_t count)
{
if (FLAGS_rocksdb_abnormal_batch_get_bytes_threshold &&
size >= FLAGS_rocksdb_abnormal_batch_get_bytes_threshold) {
return true;
}
if (FLAGS_rocksdb_abnormal_batch_get_count_threshold &&
count >= FLAGS_rocksdb_abnormal_batch_get_count_threshold) {
return true;
}
if (time_used >= _slow_query_threshold_ns) {
return true;
}
return false;
}
bool is_get_abnormal(uint64_t time_used, uint64_t value_size)
{
if (FLAGS_rocksdb_abnormal_get_size_threshold > 0 &&
value_size >= FLAGS_rocksdb_abnormal_get_size_threshold) {
return true;
}
if (time_used >= _slow_query_threshold_ns) {
return true;
}
return false;
}
::dsn::error_code
check_column_families(const std::string &path, bool *missing_meta_cf, bool *miss_data_cf);
void release_db();
::dsn::error_code flush_all_family_columns(bool wait);
void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp) override;
uint32_t query_data_version() const override;
dsn::replication::manual_compaction_status::type query_compact_status() const override;
// Log expired keys for verbose mode.
void log_expired_data(const char *op,
const dsn::rpc_address &addr,
const dsn::blob &hash_key,
const dsn::blob &sort_key) const;
void log_expired_data(const char *op, const dsn::rpc_address &addr, const dsn::blob &key) const;
void
log_expired_data(const char *op, const dsn::rpc_address &addr, const rocksdb::Slice &key) const;
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
dsn::gpid _gpid;
std::string _primary_host_port;
// slow query time threshold. exceed this threshold will be logged.
uint64_t _slow_query_threshold_ns;
range_read_limiter_options _rng_rd_opts;
std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::DBOptions _db_opts;
// The value of option in data_cf according to conf template file config.ini
rocksdb::ColumnFamilyOptions _data_cf_opts;
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
rocksdb::BlockBasedTableOptions _tbl_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
std::string _user_specified_compaction;
// Whether it is necessary to update the current data_cf, it is required when opening the db at
// the first time, but not later
bool _table_data_cf_opts_recalculated;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
static std::shared_ptr<rocksdb::WriteBufferManager> _s_write_buffer_manager;
static std::shared_ptr<rocksdb::RateLimiter> _s_rate_limiter;
static int64_t _rocksdb_limiter_last_total_through;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
std::unique_ptr<meta_store> _meta_store;
std::unique_ptr<capacity_unit_calculator> _cu_calculator;
std::unique_ptr<pegasus_server_write> _server_write;
uint32_t _checkpoint_reserve_min_count;
uint32_t _checkpoint_reserve_time_seconds;
std::atomic_bool _is_checkpointing; // whether the db is doing checkpoint
::dsn::utils::ex_lock_nr _checkpoints_lock; // protected the following checkpoints vector
std::deque<int64_t> _checkpoints; // ordered checkpoints
pegasus_context_cache _context_cache;
::dsn::task_ptr _update_replica_rdb_stat;
static ::dsn::task_ptr _update_server_rdb_stat;
pegasus_manual_compact_service _manual_compact_svc;
std::atomic<int32_t> _partition_version;
bool _validate_partition_hash{false};
dsn::replication::ingestion_status::type _ingestion_status{
dsn::replication::ingestion_status::IS_INVALID};
dsn::task_tracker _tracker;
std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;
std::shared_ptr<throttling_controller> _read_size_throttling_controller;
METRIC_VAR_DECLARE_counter(get_requests);
METRIC_VAR_DECLARE_counter(multi_get_requests);
METRIC_VAR_DECLARE_counter(batch_get_requests);
METRIC_VAR_DECLARE_counter(scan_requests);
METRIC_VAR_DECLARE_percentile_int64(get_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(multi_get_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(batch_get_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(scan_latency_ns);
METRIC_VAR_DECLARE_counter(read_expired_values);
METRIC_VAR_DECLARE_counter(read_filtered_values);
METRIC_VAR_DECLARE_counter(abnormal_read_requests);
METRIC_VAR_DECLARE_counter(throttling_rejected_read_requests);
// Server-level metrics for rocksdb.
METRIC_VAR_DECLARE_gauge_int64(rdb_block_cache_mem_usage_bytes, static);
METRIC_VAR_DECLARE_gauge_int64(rdb_wbm_total_mem_usage_bytes, static);
METRIC_VAR_DECLARE_gauge_int64(rdb_wbm_mutable_mem_usage_bytes, static);
METRIC_VAR_DECLARE_gauge_int64(rdb_write_rate_limiter_through_bytes_per_sec, static);
// Replica-level metrics for rocksdb.
METRIC_VAR_DECLARE_gauge_int64(rdb_total_sst_files);
METRIC_VAR_DECLARE_gauge_int64(rdb_total_sst_size_mb);
METRIC_VAR_DECLARE_gauge_int64(rdb_estimated_keys);
METRIC_VAR_DECLARE_gauge_int64(rdb_index_and_filter_blocks_mem_usage_bytes);
METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_mem_usage_bytes);
METRIC_VAR_DECLARE_gauge_int64(rdb_block_cache_hit_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_block_cache_total_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_hit_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_memtable_total_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_l0_hit_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_l1_hit_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_l2_and_up_hit_count);
METRIC_VAR_DECLARE_gauge_int64(rdb_write_amplification);
METRIC_VAR_DECLARE_gauge_int64(rdb_read_amplification);
METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_seek_negatives);
METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_seek_total);
METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_negatives);
METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_positives);
METRIC_VAR_DECLARE_gauge_int64(rdb_bloom_filter_point_lookup_true_positives);
};
} // namespace server
} // namespace pegasus