src/types/redis_tdigest.h (62 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #pragma once #include <rocksdb/db.h> #include <rocksdb/slice.h> #include <rocksdb/status.h> #include <vector> #include "storage/redis_db.h" #include "storage/redis_metadata.h" #include "storage/storage.h" #include "tdigest.h" namespace redis { inline constexpr uint32_t kTDigestMaxCompression = 1000; // limit the compression to 1k struct CentroidWithKey { Centroid centroid; rocksdb::Slice key; }; struct TDigestCreateOptions { uint32_t compression; }; struct TDigestQuantitleResult { std::vector<double> quantiles; }; class TDigest : public SubKeyScanner { public: using Slice = rocksdb::Slice; explicit TDigest(engine::Storage* storage, const std::string& ns) : SubKeyScanner(storage, ns), cf_handle_(storage->GetCFHandle(ColumnFamilyID::PrimarySubkey)) {} /** * @brief Create a t-digest structure. * * @param ctx The context of the operation. * @param digest_name The name of the t-digest. * @param options The options of the t-digest. * @param exists The output parameter to indicate whether the t-digest already exists. * @return rocksdb::Status */ rocksdb::Status Create(engine::Context& ctx, const Slice& digest_name, const TDigestCreateOptions& options, bool* exists); rocksdb::Status Add(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& inputs); rocksdb::Status Quantile(engine::Context& ctx, const Slice& digest_name, const std::vector<double>& qs, TDigestQuantitleResult* result); rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name); rocksdb::Status GetMetaData(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata); private: enum class SegmentType : uint8_t { kBuffer = 0, kCentroids = 1, kGuardFlag = 0xFF }; rocksdb::ColumnFamilyHandle* cf_handle_; rocksdb::Status getMetaDataByNsKey(engine::Context& context, const Slice& digest_name, TDigestMetadata* metadata); rocksdb::Status appendBuffer(engine::Context& ctx, ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const std::string& ns_key, const std::vector<double>& inputs, TDigestMetadata* metadata); rocksdb::Status dumpCentroids(engine::Context& ctx, const std::string& ns_key, const TDigestMetadata& metadata, std::vector<Centroid>* centroids) { return dumpCentroidsAndBuffer(ctx, ns_key, metadata, centroids, nullptr, nullptr); } /** * @brief Dumps the centroids and buffer of the t-digest. * * This function reads the centroids and buffer from persistent storage and removes them from the storage. * @param ctx The context of the operation. * @param ns_key The namespace key of the t-digest. * @param metadata The metadata of the t-digest. * @param centroids The output vector to store the centroids. * @param buffer The output vector to store the buffer. If it is nullptr, the buffer will not be read. * @param clean_after_dump_batch The write batch to store the clean operations. If it is nullptr, the clean operations * @return rocksdb::Status */ rocksdb::Status dumpCentroidsAndBuffer(engine::Context& ctx, const std::string& ns_key, const TDigestMetadata& metadata, std::vector<Centroid>* centroids, std::vector<double>* buffer, ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch); rocksdb::Status applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const std::string& ns_key, const TDigestMetadata& metadata, const std::vector<Centroid>& centroids); std::string internalSegmentGuardPrefixKey(const TDigestMetadata& metadata, const std::string& ns_key, SegmentType seg) const; rocksdb::Status mergeCurrentBuffer(engine::Context& ctx, const std::string& ns_key, ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata, const std::vector<double>* additional_buffer = nullptr); std::string internalBufferKey(const std::string& ns_key, const TDigestMetadata& metadata) const; std::string internalKeyFromCentroid(const std::string& ns_key, const TDigestMetadata& metadata, const Centroid& centroid, uint32_t seq) const; static std::string internalValueFromCentroid(const Centroid& centroid); rocksdb::Status decodeCentroidFromKeyValue(const rocksdb::Slice& key, const rocksdb::Slice& value, Centroid* centroid) const; }; } // namespace redis