libminifi/include/core/repository/AtomicRepoEntries.h (286 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ref_count_hip. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <span> #include <atomic> #include <chrono> #include <cstddef> #include <cstring> #include <functional> #include <iostream> #include <iterator> #include <map> #include <string> #include <utility> #include <vector> #include "utils/gsl.h" namespace org::apache::nifi::minifi::core::repository { /** * Purpose: Repo value represents an item that will support a move operation within an AtomicEntry * * Justification: Since AtomicEntry is a static entry that does not move or change, the underlying * RepoValue can be changed to support atomic operations. */ template<typename T> class RepoValue { public: RepoValue() = default; /** * Constructor that populates the item allowing for a custom key comparator. * @param key key for this repo value. * @param ptr buffer * @param size size buffer * @param comparator custom comparator. */ explicit RepoValue(T key, const uint8_t *ptr, size_t size, std::function<bool(T, T)> comparator = nullptr) : key_(key), comparator_(comparator) { if (nullptr == ptr) { size = 0; } buffer_.resize(size); if (size > 0) { std::memcpy(buffer_.data(), ptr, size); } } /** * RepoValue that moves the other object into this. */ explicit RepoValue(RepoValue<T> &&other) noexcept : key_(std::move(other.key_)), buffer_(std::move(other.buffer_)), comparator_(std::move(other.comparator_)) { } ~RepoValue() = default; T &getKey() { return key_; } /** * Sets the key, relacing the custom comparator if needed. */ void setKey(const T key, std::function<bool(T, T)> comparator = nullptr) { key_ = key; comparator_ = comparator; } /** * Determines if the key is the same using the custom comparator * @param other object to compare against * @return result of the comparison */ inline bool isEqual(RepoValue<T> *other) { return comparator_ == nullptr ? key_ == other->key_ : comparator_(key_, other->key_); } /** * Determines if the key is the same using the custom comparator * @param other object to compare against * @return result of the comparison */ inline bool isKey(T other) { return comparator_ == nullptr ? key_ == other : comparator_(key_, other); } /** * Clears the buffer. */ void clearBuffer() { buffer_.resize(0); buffer_.clear(); } [[nodiscard]] size_t size() const noexcept { return buffer_.size(); } [[nodiscard]] std::span<const std::byte> getBuffer() const { return buffer_; } /** * Places the contents of buffer into str * @param strnig into which we are placing the memory contained in buffer. */ void emplace(std::string &str) { str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); } /** * Appends data to the end of buffer. * @param data data to add to buffer_ */ void append(std::span<const std::byte> data) { buffer_.insert(buffer_.end(), std::begin(data), std::end(data)); } RepoValue<T> &operator=(RepoValue<T> &&other) noexcept { key_ = std::move(other.key_); buffer_ = std::move(other.buffer_); return *this; } private: T key_; std::function<bool(T, T)> comparator_; std::vector<std::byte> buffer_; }; /** * Purpose: Atomic Entry allows us to create a statically * sized ring buffer, with the ability to create * **/ template<typename T> class AtomicEntry { public: /** * Constructor that accepts a max size and an atomic counter for the total * size allowd by this and other atomic entries. */ explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size) : accumulated_repo_size_(total_size), max_repo_size_(max_size), write_pending_(false), has_value_(false), ref_count_(0), free_required(false) { } /** * Sets the repo value, moving the old value into old_value. * @param new_value new value to move into value_. * @param old_value the previous value of value_ will be moved into old_value * @param prev_size size reclaimed. * @return result of this set. If true old_value will be populated. */ bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t &prev_size) { // delete the underlying pointer bool lock = false; if (!write_pending_.compare_exchange_weak(lock, true)) { return false; } if (has_value_) { prev_size = value_.size(); } old_value = std::move(value_); value_ = std::move(new_value); has_value_ = true; try_unlock(); return true; } AtomicEntry<T> *takeOwnership() { bool lock = false; if (!write_pending_.compare_exchange_weak(lock, true)) return nullptr; ref_count_++; try_unlock(); return this; } /** * A test and set operation, which is used to allow a function to test * if an item can be released and a function used for reclaiming memory associated * with said object. * A custom comparator can be provided to augment the key being added into value_ */ bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> comparator = nullptr) { bool lock = false; if (!write_pending_.compare_exchange_weak(lock, true)) return false; if (has_value_) { // we either don't have a release test or we cannot release this // entity if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) { reclaimer(value_.getKey()); } else if (free_required && ref_count_ == 0) { size_t bufferSize = value_.getBuffer().size(); value_.clearBuffer(); has_value_ = false; if (accumulated_repo_size_ != nullptr) { *accumulated_repo_size_ -= bufferSize; } free_required = false; } else { try_unlock(); return false; } } ref_count_ = 1; value_.setKey(str, comparator); has_value_ = true; try_unlock(); return true; } /** * Moved the value into the argument * @param value the previous value will be moved into this parameter * @return success of get operation based on whether or not this atomic entry has a value. */ bool getValue(RepoValue<T> &value) { try_lock(); if (!has_value_) { try_unlock(); return false; } value = std::move(value_); has_value_ = false; try_unlock(); return true; } /** * Moved the value into the argument * @param value the previous value will be moved into this parameter * @return success of get operation based on whether or not this atomic entry has a value. */ bool getValue(const T &key, RepoValue<T> &value) { try_lock(); if (!has_value_) { try_unlock(); return false; } if (!value_.isKey(key)) { try_unlock(); return false; } value = std::move(value_); has_value_ = false; try_unlock(); return true; } void decrementOwnership() { try_lock(); if (!has_value_) { try_unlock(); return; } if (ref_count_ > 0) { ref_count_--; } if (ref_count_ == 0 && free_required) { size_t bufferSize = value_.getBuffer().size(); value_.clearBuffer(); has_value_ = false; if (accumulated_repo_size_ != nullptr) { *accumulated_repo_size_ -= bufferSize; } free_required = false; } else { } try_unlock(); } /** * Moved the value into the argument * @param value the previous value will be moved into this parameter * @return success of get operation based on whether or not this atomic entry has a value. */ bool getValue(const T &key, RepoValue<T> **value) { try_lock(); if (!has_value_) { try_unlock(); return false; } if (!value_.isKey(key)) { try_unlock(); return false; } ref_count_++; *value = &value_; try_unlock(); return true; } /** * Operation that will be used to test and free if a release is required without * setting a new object. * @param releaseTest function that will be used to free the RepoValue key from * this atomic entry. * @param freedValue informs the caller if an item was freed. */ T testAndFree(std::function<bool(T)> releaseTest, bool& /*freedValue*/) { try_lock(); T ref; if (!has_value_) { try_unlock(); return ref; } if (releaseTest(value_.getKey())) { size_t bufferSize = value_.getBufferSize(); value_.clearBuffer(); ref = value_.getKey(); has_value_ = false; if (accumulated_repo_size_ != nullptr) { *accumulated_repo_size_ -= bufferSize; } } try_unlock(); return ref; } size_t getLength() { size_t size = 0; try_lock(); size = value_.getBuffer().size(); try_unlock(); return size; } /** * sets has_value to false; however, does not call * any external entity to further free RepoValue */ bool freeValue(const T &key) { try_lock(); if (!has_value_) { try_unlock(); return false; } if (!value_.isKey(key)) { try_unlock(); return false; } if (ref_count_ > 0) { free_required = true; try_unlock(); return true; } size_t bufferSize = value_.getBuffer().size(); value_.clearBuffer(); has_value_ = false; if (accumulated_repo_size_ != nullptr) { *accumulated_repo_size_ -= bufferSize; } free_required = false; try_unlock(); return true; } /** * Appends buffer onto this atomic entry if key matches * the current RepoValue's key. */ bool insert(const T key, std::span<const std::byte> buffer) { try_lock(); if (!has_value_) { try_unlock(); return false; } if (!value_.isKey(key)) { try_unlock(); return false; } if ((accumulated_repo_size_ != nullptr && max_repo_size_ != nullptr) && (*accumulated_repo_size_ + buffer.size() > *max_repo_size_)) { // can't support this write try_unlock(); return false; } value_.append(buffer); (*accumulated_repo_size_) += buffer.size(); try_unlock(); return true; } private: /** * Spin lock to unlock the current atomic entry. */ inline void try_lock() { bool lock = false; while (!write_pending_.compare_exchange_weak(lock, true, std::memory_order_acquire)) { lock = false; // attempt again } } /** * Spin lock to unlock the current atomic entry. */ inline void try_unlock() { bool lock = true; while (!write_pending_.compare_exchange_weak(lock, false, std::memory_order_acquire)) { lock = true; // attempt again } } // atomic size pointer. std::atomic<size_t> *accumulated_repo_size_; // max size size_t *max_repo_size_; // determines if a write is pending. std::atomic<bool> write_pending_; // used to determine if a value is present in this atomic entry. std::atomic<bool> has_value_; std::atomic<uint16_t> ref_count_; std::atomic<bool> free_required; // repo value. RepoValue<T> value_; }; } // namespace org::apache::nifi::minifi::core::repository