extensions/rocksdb-repos/FlowFileRepository.cpp (260 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "FlowFileRepository.h" #include <chrono> #include <list> #include <memory> #include <optional> #include <string> #include <utility> #include <vector> #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "utils/gsl.h" #include "core/Resource.h" #include "utils/OptionalUtils.h" #include "core/TypedValues.h" #include "FlowFileRecord.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::repository { void FlowFileRepository::flush() { auto opendb = db_->open(); if (!opendb) { return; } auto batch = opendb->createWriteBatch(); std::list<ExpiredFlowFileInfo> flow_files; while (keys_to_delete_.size_approx() > 0) { ExpiredFlowFileInfo info; if (keys_to_delete_.try_dequeue(info)) { flow_files.push_back(std::move(info)); } } deserializeFlowFilesWithNoContentClaim(opendb.value(), flow_files); for (auto& ff : flow_files) { batch.Delete(ff.key); logger_->log_debug("Issuing batch delete, including {}, Content path {}", ff.key, ff.content ? ff.content->getContentFullPath() : "null"); } auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); }; if (!ExecuteWithRetry(operation)) { for (auto&& ff : flow_files) { keys_to_delete_.enqueue(std::move(ff)); } return; // Stop here - don't delete from content repo while we have records in FF repo } if (content_repo_) { for (auto& ff : flow_files) { if (ff.content) { ff.content->decreaseFlowFileRecordOwnedCount(); } } } } void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal::OpenRocksDb& opendb, std::list<ExpiredFlowFileInfo>& flow_files) { std::vector<rocksdb::Slice> keys; std::vector<std::list<ExpiredFlowFileInfo>::iterator> key_positions; for (auto it = flow_files.begin(); it != flow_files.end(); ++it) { if (!it->content) { keys.push_back(it->key); key_positions.push_back(it); } } if (keys.empty()) { return; } std::vector<std::string> values; rocksdb::ReadOptions options; options.verify_checksums = verify_checksums_in_rocksdb_reads_; auto multistatus = opendb.MultiGet(options, keys, &values); gsl_Expects(keys.size() == values.size() && values.size() == multistatus.size()); for (size_t i = 0; i < keys.size(); ++i) { if (!multistatus[i].ok()) { logger_->log_error("Failed to read key from rocksdb: {}! DB is most probably in an inconsistent state!", keys[i].data()); flow_files.erase(key_positions.at(i)); continue; } utils::Identifier container_id; auto flow_file = FlowFileRecord::DeSerialize(gsl::make_span(values[i]).as_span<const std::byte>(), content_repo_, container_id); if (flow_file) { gsl_Expects(flow_file->getUUIDStr() == key_positions.at(i)->key); key_positions.at(i)->content = flow_file->getResourceClaim(); } else { logger_->log_error("Could not deserialize flow file {}", key_positions.at(i)->key); } } } void FlowFileRepository::run() { while (isRunning()) { std::this_thread::sleep_for(purge_period_); flush(); } flush(); } bool FlowFileRepository::contentSizeIsAmpleForFlowFile(const FlowFile& flow_file_record, const std::shared_ptr<ResourceClaim>& resource_claim) const { const auto stream_size = resource_claim ? content_repo_->size(*resource_claim) : 0; const auto required_size = flow_file_record.getOffset() + flow_file_record.getSize(); return stream_size >= required_size; } Connectable* FlowFileRepository::getContainer(const std::string& container_id) { auto container = containers_.find(container_id); if (container != containers_.end()) return container->second; // for backward compatibility container = connection_map_.find(container_id); if (container != connection_map_.end()) return container->second; return nullptr; } void FlowFileRepository::initialize_repository() { auto opendb = db_->open(); if (!opendb) { logger_->log_trace("Couldn't open database to load existing flow files"); return; } logger_->log_info("Reading existing flow files from database"); rocksdb::ReadOptions options; options.verify_checksums = verify_checksums_in_rocksdb_reads_; const auto it = opendb->NewIterator(options); for (it->SeekToFirst(); it->Valid(); it->Next()) { utils::Identifier container_id; auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>(), content_repo_, container_id); const std::string key = it->key().ToString(); if (!eventRead) { // failed to deserialize FlowFile, cannot clear claim keys_to_delete_.enqueue({.key = key}); continue; } auto claim = eventRead->getResourceClaim(); if (claim) { claim->increaseFlowFileRecordOwnedCount(); } const auto container = getContainer(container_id.to_string()); if (!container) { logger_->log_warn("Could not find connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); continue; } if (check_flowfile_content_size_ && !contentSizeIsAmpleForFlowFile(*eventRead, claim)) { logger_->log_warn("Content is missing or too small for flowfile {}", eventRead->getContentFullPath()); keys_to_delete_.enqueue({.key = key, .content = eventRead->getResourceClaim()}); continue; } logger_->log_debug("Found connection for {}, path {}", container_id.to_string(), eventRead->getContentFullPath()); eventRead->setStoredToRepository(true); // we found the connection for the persistent flowFile // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles container->restore(eventRead); } flush(); content_repo_->clearOrphans(); } void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { content_repo_ = content_repo; swap_loader_ = std::make_unique<FlowFileLoader>(gsl::make_not_null(db_.get()), content_repo_, verify_checksums_in_rocksdb_reads_); initialize_repository(); } namespace { bool getRepositoryCheckHealth(const Configure& configure) { std::string check_health_str; configure.get(Configure::nifi_flow_file_repository_check_health, check_health_str); return utils::string::toBool(check_health_str).value_or(true); } } // namespace bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) { config_ = configure; std::string value; if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) { directory_ = value; } check_flowfile_content_size_ = getRepositoryCheckHealth(*configure); logger_->log_debug("NiFi FlowFile Repository Directory {}", directory_); setCompactionPeriod(configure); const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME}); logger_->log_info("Using {} FlowFileRepository", encrypted_env ? "encrypted" : "plaintext"); verify_checksums_in_rocksdb_reads_ = (configure->get(Configure::nifi_flowfile_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false); logger_->log_debug("{} checksum verification in FlowFileRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using"); auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) { minifi::internal::setCommonRocksDbOptions(options); if (encrypted_env) { options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); } }; // Write buffers are used as db operation logs. When they get filled the events are merged and serialized. // The default size is 64MB. // In our case it's usually too much, causing sawtooth in memory consumption. (Consumes more than the whole MiniFi) // To avoid DB write issues during heavy load it's recommended to have high number of buffer. // Rocksdb's stall feature can also trigger in case the number of buffers is >= 3. // The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load. auto cf_options = [&configure] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.OptimizeForPointLookup(4); cf_opts.write_buffer_size = 8ULL << 20U; cf_opts.max_write_buffer_number = 20; cf_opts.min_write_buffer_number_to_merge = 1; if (auto compression_type = minifi::internal::readConfiguredCompressionType(configure, Configure::nifi_flow_repository_rocksdb_compression)) { cf_opts.compression = *compression_type; } }; db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_, minifi::internal::getRocksDbOptionsToOverride(configure, Configure::nifi_flowfile_repository_rocksdb_options)); if (db_->open()) { logger_->log_debug("NiFi FlowFile Repository database open {} success", directory_); return true; } else { logger_->log_error("NiFi FlowFile Repository database open {} fail", directory_); return false; } } void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &configure) { compaction_period_ = DEFAULT_COMPACTION_PERIOD; if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_rocksdb_compaction_period)) { if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) { compaction_period_ = compaction_period->getMilliseconds(); if (compaction_period_.count() == 0) { logger_->log_warn("Setting '{}' to 0 disables forced compaction", Configure::nifi_flowfile_repository_rocksdb_compaction_period); } } else { logger_->log_error("Malformed property '{}', expected time period, using default", Configure::nifi_flowfile_repository_rocksdb_compaction_period); } } else { logger_->log_debug("Using default compaction period of {}", compaction_period_); } } bool FlowFileRepository::Delete(const std::string& key) { keys_to_delete_.enqueue({.key = key}); return true; } void FlowFileRepository::runCompaction() { do { if (auto opendb = db_->open()) { auto status = opendb->RunCompaction(); logger_->log_trace("Compaction triggered: {}", status.ToString()); } else { logger_->log_error("Failed to open database for compaction"); } } while (!utils::StoppableThread::waitForStopRequest(compaction_period_)); } bool FlowFileRepository::start() { const bool ret = ThreadedRepositoryImpl::start(); if (swap_loader_) { swap_loader_->start(); } if (compaction_period_.count() != 0) { compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () { runCompaction(); }); } return ret; } bool FlowFileRepository::stop() { compaction_thread_.reset(); if (swap_loader_) { swap_loader_->stop(); } return ThreadedRepositoryImpl::stop(); } void FlowFileRepository::store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) { gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored)); // pass, flowfiles are already persisted in the repository } std::future<std::vector<std::shared_ptr<core::FlowFile>>> FlowFileRepository::load(std::vector<SwappedFlowFile> flow_files) { return swap_loader_->load(std::move(flow_files)); } bool FlowFileRepository::Delete(const std::shared_ptr<core::CoreComponent>& item) { if (auto ff = std::dynamic_pointer_cast<core::FlowFile>(item)) { keys_to_delete_.enqueue({.key = item->getUUIDStr(), .content = ff->getResourceClaim()}); } else { keys_to_delete_.enqueue({.key = item->getUUIDStr()}); } return true; } REGISTER_RESOURCE_AS(FlowFileRepository, InternalResource, ("FlowFileRepository", "flowfilerepository")); } // namespace org::apache::nifi::minifi::core::repository