src/kudu/consensus/log_cache.h (123 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #ifndef KUDU_CONSENSUS_LOG_CACHE_H #define KUDU_CONSENSUS_LOG_CACHE_H #include <cstdint> #include <iosfwd> #include <map> #include <memory> #include <string> #include <vector> #include <boost/optional/optional.hpp> #include <gtest/gtest_prod.h> #include "kudu/consensus/ref_counted_replicate.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/util/condition_variable.h" #include "kudu/util/faststring.h" #include "kudu/util/metrics.h" #include "kudu/util/mutex.h" #include "kudu/util/status.h" #include "kudu/util/status_callback.h" namespace kudu { class CompressionCodec; class MemTracker; namespace log { class Log; } // namespace log namespace consensus { class OpId; class ReplicateMsg; struct ReadContext; // Write-through cache for the log. // // This stores a set of log messages by their index. New operations // can be appended to the end as they are written to the log. Readers // fetch entries that were explicitly appended, or they can fetch older // entries which are asynchronously fetched from the disk. class LogCache { public: LogCache(const scoped_refptr<MetricEntity>& metric_entity, scoped_refptr<log::Log> log, std::string local_uuid, std::string tablet_id); ~LogCache(); // Initialize the cache. // // 'preceding_op' is the current latest op. The next AppendOperation() call // must follow this op. // // Requires that the cache is empty. void Init(const OpId& preceding_op); // Read operations from the log, following 'after_op_index'. // If such an op exists in the log, an OK result will always include at least one // operation. // // The result will be limited such that the total ByteSize() of the returned ops // is less than max_size_bytes, unless that would result in an empty result, in // which case exactly one op is returned. // // The OpId which precedes the returned ops is returned in *preceding_op. // The index of this OpId will match 'after_op_index'. // // If the ops being requested are not available in the log, this will synchronously // read these ops from disk. Therefore, this function may take a substantial amount // of time and should not be called with important locks held, etc. Status ReadOps(int64_t after_op_index, int max_size_bytes, const ReadContext& context, std::vector<ReplicateRefPtr>* messages, OpId* preceding_op); // Similar to ReadOps(...), but blocks for 'max_duration_ms' if // 'after_op_index' is not available in the local log. // // max_duration_ms is the maximum duration to wait for 'after_op_index' (in // milliseconds) // // Returns "Incomplete" if the op has not yet been written to the log even // after waiting for 'max_duration_ms' // Returns "NotFound" if the op has been GCed. // Returns another bad Status if the log index fails to load (eg. due to an IO error). Status BlockingReadOps(int64_t after_op_index, int max_size_bytes, const ReadContext& context, int64_t max_duration_ms, std::vector<ReplicateRefPtr>* messages, OpId* preceding_op); // Append the operations into the log and the cache. // When the messages have completed writing into the on-disk log, fires 'callback'. // // If the cache memory limit is exceeded, the entries may no longer be in the cache // when the callback fires. // // Returns non-OK if the Log append itself fails. Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs, const StatusCallback& callback); // Truncate any operations with index > 'index'. // // Following this, reads of truncated indexes using ReadOps(), LookupOpId(), // HasOpBeenWritten(), etc, will return as if the operations were never appended. // // NOTE: unless a new operation is appended followig 'index', this truncation does // not persist across server restarts. void TruncateOpsAfter(int64_t index); // Return true if an operation with the given index has been written through // the cache. The operation may not necessarily be durable yet -- it could still be // en route to the log. bool HasOpBeenWritten(int64_t index) const; // Evict any operations with op index <= 'index'. void EvictThroughOp(int64_t index); // Return the number of bytes of memory currently in use by the cache. int64_t BytesUsed() const; int64_t num_cached_ops() const { return metrics_.log_cache_num_ops->value(); } // Dump the current contents of the cache to the log. void DumpToLog() const; // Dumps the contents of the cache to the provided string vector. void DumpToStrings(std::vector<std::string>* lines) const; void DumpToHtml(std::ostream& out) const; std::string StatsString() const; std::string ToString() const; // Look up the OpId for the given operation index. // If it is not in the cache, this consults the on-disk log index and thus // may take a non-trivial amount of time due to IO. // // Returns "Incomplete" if the op has not yet been written. // Returns "NotFound" if the op has been GCed. // Returns another bad Status if the log index fails to load (eg. due to an IO error). Status LookupOpId(int64_t op_index, OpId* op_id) const; // Sets compression codec and updates the internal codec_ atomic pointer Status SetCompressionCodec(const std::string& codec); // Enable (or disable) compression of messages read from log Status EnableCompressionOnCacheMiss(bool enable); private: FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages); FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit); FRIEND_TEST(LogCacheTest, TestReplaceMessages); FRIEND_TEST(LogCacheTest, TestTruncation); friend class LogCacheTest; // Compress the payload in 'msg' and populate a new ReplicateMsg with // compressed payload in 'compressed_msg'. Uses 'buffer' as a temporary buffer // to hold the compressed message Status CompressMsg(const ReplicateMsg* msg, faststring& buffer, std::unique_ptr<ReplicateMsg>* compressed_msg); // Compress all messages in 'replicate_ptrs' and return the compressed // messages in 'compressed_replicate_ptrs'. If any message is uncompressable, // then it inserts the original msg into 'compressed_replicate_ptrs' Status CompressMsgs(const std::vector<ReplicateMsg*>& replicate_ptrs, std::vector<ReplicateMsg*>* compressed_repliate_ptrs); // Uncompresses the payload of 'msg' based on its compression_codec and // populates a new ReplicateMsg with uncompressed payload in // 'uncompressed_msg'. Uses 'buffer' as a temporary buffer to hold compressed // message Status UncompressMsg( const ReplicateRefPtr& msg, faststring& buffer, std::unique_ptr<ReplicateMsg>* uncompressed_msg); // An entry in the cache. struct CacheEntry { ReplicateRefPtr msg; // The cached value of msg->SpaceUsedLong(). This method is expensive // to compute, so we compute it only once upon insertion. int64_t mem_usage; // The uncompressed size of the msg. If msg is not compressed, then it is // same as mem_usage int64_t msg_size; }; // Try to evict the oldest operations from the queue, stopping either when // 'bytes_to_evict' bytes have been evicted, or the op with index // 'stop_after_index' has been evicted, whichever comes first. void EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict); // Update metrics and MemTracker to account for the removal of the // given message. void AccountForMessageRemovalUnlocked(const CacheEntry& entry); void TruncateOpsAfterUnlocked(int64_t index); // Return a string with stats std::string StatsStringUnlocked() const; std::string ToStringUnlocked() const; std::string LogPrefixUnlocked() const; void LogCallback(int64_t last_idx_in_batch, bool borrowed_memory, const StatusCallback& user_callback, const Status& log_status); scoped_refptr<log::Log> const log_; // The UUID of the local peer. const std::string local_uuid_; // The id of the tablet. const std::string tablet_id_; mutable Mutex lock_; ConditionVariable next_index_cond_; // An ordered map that serves as the buffer for the cached messages. // Maps from log index -> ReplicateMsg typedef std::map<uint64_t, CacheEntry> MessageCache; MessageCache cache_; // The next log index to append. Each append operation must either // start with this log index, or go backward (but never skip forward). int64_t next_sequential_op_index_; // Any operation with an index >= min_pinned_op_ may not be // evicted from the cache. This is used to prevent ops from being evicted // until they successfully have been appended to the underlying log. // Protected by lock_. int64_t min_pinned_op_index_; // Pointer to a parent memtracker for all log caches. This // exists to compute server-wide cache size and enforce a // server-wide memory limit. When the first instance of a log // cache is created, a new entry is added to MemTracker's static // map; subsequent entries merely increment the refcount, so that // the parent tracker can be deleted if all log caches are // deleted (e.g., if all tablets are deleted from a server, or if // the server is shutdown). std::shared_ptr<MemTracker> parent_tracker_; // A MemTracker for this instance. std::shared_ptr<MemTracker> tracker_; struct Metrics { explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity); // Keeps track of the total number of operations in the cache. scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops; // Keeps track of the memory consumed by the cache, in bytes. scoped_refptr<AtomicGauge<int64_t> > log_cache_size; // Keeps track of uncompressed size of messages cached. This will be same as // log_cache_size if compression is not enabled and on secondaries scoped_refptr<AtomicGauge<int64_t> > log_cache_msg_size; // Payload size of the msg that is written to the log scoped_refptr<Counter> log_cache_payload_size; // Payload size of the compressed msg payload that is sent over the wire // If compression is disabled, it is the same as log_cache_payload_size scoped_refptr<Counter> log_cache_compressed_payload_size; }; Metrics metrics_; // Compression codec to use std::atomic<const CompressionCodec*> codec_; // Temporary buffer to use for compression. This is used during append // operation to compress and/or uncompress payloads. Note that the same buffer // gets reused multiple times - this assumens that AppendOperation is // serialized externally and that there can be only one in-flight append // operation faststring log_cache_compression_buf_; std::atomic<bool> enable_compression_on_cache_miss_; DISALLOW_COPY_AND_ASSIGN(LogCache); }; } // namespace consensus } // namespace kudu #endif /* KUDU_CONSENSUS_LOG_CACHE_H */