extensions/libarchive/BinFiles.h (242 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <cinttypes> #include <limits> #include <deque> #include <memory> #include <unordered_set> #include <string> #include <set> #include <map> #include <utility> #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/RelationshipDefinition.h" #include "core/logging/LoggerFactory.h" #include "utils/gsl.h" #include "utils/Id.h" #include "utils/Export.h" #include "core/FlowFileStore.h" namespace org::apache::nifi::minifi::processors { class Bin { public: explicit Bin(const uint64_t &minSize, const uint64_t &maxSize, const size_t &minEntries, const size_t & maxEntries, std::string fileCount, std::string groupId) : minSize_(minSize), maxSize_(maxSize), maxEntries_(maxEntries), minEntries_(minEntries), fileCount_(std::move(fileCount)), groupId_(std::move(groupId)) { queued_data_size_ = 0; creation_dated_ = std::chrono::system_clock::now(); uuid_ = utils::IdGenerator::getIdGenerator()->generate(); logger_->log_debug("Bin {} for group {} created", getUUIDStr(), groupId_); } virtual ~Bin() { logger_->log_debug("Bin {} for group {} destroyed", getUUIDStr(), groupId_); } [[nodiscard]] bool isFull() const { return queued_data_size_ >= maxSize_ || queue_.size() >= maxEntries_; } // check whether the bin meet the min required size and entries so that it can be processed for merge [[nodiscard]] bool isReadyForMerge() const { return closed_ || isFull() || (queued_data_size_ >= minSize_ && queue_.size() >= minEntries_); } [[nodiscard]] bool isOlderThan(const std::chrono::milliseconds duration) const { return std::chrono::system_clock::now() > (creation_dated_ + duration); } std::deque<std::shared_ptr<core::FlowFile>>& getFlowFile() { return queue_; } bool offer(const std::shared_ptr<core::FlowFile>& flow) { if (!fileCount_.empty()) { std::string value; if (flow->getAttribute(fileCount_, value)) { try { // for defrag case using the identification size_t count = std::stoul(value); maxEntries_ = count; minEntries_ = count; } catch (...) { } } } if ((queued_data_size_ + flow->getSize()) > maxSize_ || (queue_.size() + 1) > maxEntries_) { closed_ = true; return false; } queue_.push_back(flow); queued_data_size_ += flow->getSize(); logger_->log_debug("Bin {} for group {} offer size {} byte {} min_entry {} max_entry {}", getUUIDStr(), groupId_, queue_.size(), queued_data_size_, minEntries_, maxEntries_); return true; } [[nodiscard]] std::chrono::system_clock::time_point getCreationDate() const { return creation_dated_; } [[nodiscard]] int getSize() const { return gsl::narrow<int>(queue_.size()); } [[nodiscard]] utils::SmallString<36> getUUIDStr() const { return uuid_.to_string(); } [[nodiscard]] std::string getGroupId() const { return groupId_; } private: uint64_t minSize_; uint64_t maxSize_; size_t maxEntries_; size_t minEntries_; uint64_t queued_data_size_; bool closed_{false}; std::deque<std::shared_ptr<core::FlowFile>> queue_; std::chrono::system_clock::time_point creation_dated_; std::string fileCount_; std::string groupId_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Bin>::getLogger(); utils::Identifier uuid_; }; class BinManager { public: virtual ~BinManager() { purge(); } void setMinSize(uint64_t size) { minSize_ = {size}; } void setMaxSize(uint64_t size) { maxSize_ = {size}; } void setMaxEntries(uint32_t entries) { maxEntries_ = {entries}; } void setMinEntries(uint32_t entries) { minEntries_ = {entries}; } void setBinAge(std::chrono::milliseconds age) { binAge_ = age; } [[nodiscard]] int getBinCount() const { return binCount_; } void setFileCount(const std::string &value) { fileCount_ = value; } void purge() { std::lock_guard<std::mutex> lock(mutex_); groupBinMap_.clear(); binCount_ = 0; } // Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. bool offer(const std::string &group, const std::shared_ptr<core::FlowFile>& flow); // gather ready bins once the bin are full enough or exceed bin age void gatherReadyBins(); // marks oldest bin as ready void removeOldestBin(); void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins); void addReadyBin(std::unique_ptr<Bin> ready_bin); private: std::mutex mutex_; uint64_t minSize_{0}; uint64_t maxSize_{std::numeric_limits<decltype(maxSize_)>::max()}; uint32_t maxEntries_{std::numeric_limits<decltype(maxEntries_)>::max()}; uint32_t minEntries_{1}; std::string fileCount_; std::chrono::milliseconds binAge_{std::chrono::milliseconds::max()}; std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>> >groupBinMap_; std::deque<std::unique_ptr<Bin>> readyBin_; int binCount_{0}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<BinManager>::getLogger()}; }; class BinFiles : public core::ProcessorImpl { protected: static const core::Relationship Self; public: using core::ProcessorImpl::ProcessorImpl; ~BinFiles() override = default; EXTENSIONAPI static constexpr const char* Description = "Bins flow files into buckets based on the number of entries or size of entries"; EXTENSIONAPI static constexpr auto MinSize = core::PropertyDefinitionBuilder<>::createProperty("Minimum Group Size") .withDescription("The minimum size of for the bundle") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("0") .build(); EXTENSIONAPI static constexpr auto MaxSize = core::PropertyDefinitionBuilder<>::createProperty("Maximum Group Size") .withDescription("The maximum size for the bundle. If not specified, there is no maximum.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto MinEntries = core::PropertyDefinitionBuilder<>::createProperty("Minimum Number of Entries") .withDescription("The minimum number of files to include in a bundle") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("1") .build(); EXTENSIONAPI static constexpr auto MaxEntries = core::PropertyDefinitionBuilder<>::createProperty("Maximum Number of Entries") .withDescription("The maximum number of files to include in a bundle. If not specified, there is no maximum.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto MaxBinAge = core::PropertyDefinitionBuilder<>::createProperty("Max Bin Age") .withDescription("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>") .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto MaxBinCount = core::PropertyDefinitionBuilder<>::createProperty("Maximum number of Bins") .withDescription("Specifies the maximum number of bins that can be held in memory at any one time") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("100") .build(); EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") .withDescription("Maximum number of FlowFiles processed in a single session") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("1") .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ MinSize, MaxSize, MinEntries, MaxEntries, MaxBinCount, MaxBinAge, BatchSize }); EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "If the bundle cannot be created, all FlowFiles that would have been used to create the bundle will be transferred to failure"}; EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The FlowFiles that were used to create the bundle"}; EXTENSIONAPI static constexpr auto Relationships = std::array{ Failure, Original }; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS EXTENSIONAPI static const char *FRAGMENT_ID_ATTRIBUTE; EXTENSIONAPI static const char *FRAGMENT_INDEX_ATTRIBUTE; EXTENSIONAPI static const char *FRAGMENT_COUNT_ATTRIBUTE; EXTENSIONAPI static const char *SEGMENT_ID_ATTRIBUTE; EXTENSIONAPI static const char *SEGMENT_INDEX_ATTRIBUTE; EXTENSIONAPI static const char *SEGMENT_COUNT_ATTRIBUTE; EXTENSIONAPI static const char *SEGMENT_ORIGINAL_FILENAME; EXTENSIONAPI static const char *TAR_PERMISSIONS_ATTRIBUTE; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; void restore(const std::shared_ptr<core::FlowFile>& flowFile) override; std::set<core::Connectable*> getOutGoingConnections(const std::string &relationship) override; protected: // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). virtual void preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow); // Returns a group ID representing a bin. This allows flow files to be binned into like groups virtual std::string getGroupId(const std::shared_ptr<core::FlowFile>& /*flow*/) { return ""; } virtual bool processBin(core::ProcessSession& /*session*/, std::unique_ptr<Bin>& /*bin*/) { return false; } static void transferFlowsToFail(core::ProcessSession &session, std::unique_ptr<Bin> &bin); static void addFlowsToSession(core::ProcessSession &session, std::unique_ptr<Bin> &bin); // Sort flow files retrieved from the flow file repository after restart to their respective bins bool resurrectFlowFiles(core::ProcessSession &session); bool assumeOwnershipOfNextBatch(core::ProcessSession &session); std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext &context); void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins, core::ProcessSession &session); BinManager binManager_; private: uint32_t batchSize_{1}; uint32_t maxBinCount_{100}; core::FlowFileStore file_store_; }; } // namespace org::apache::nifi::minifi::processors