extensions/rocksdb-repos/FlowFileLoader.cpp (83 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "FlowFileLoader.h" #include <span> #include <memory> #include <string> #include <vector> #include <utility> #include "logging/LoggerConfiguration.h" #include "FlowFileRecord.h" #include "utils/gsl.h" namespace org::apache::nifi::minifi { FlowFileLoader::FlowFileLoader(gsl::not_null<minifi::internal::RocksDatabase*> db, std::shared_ptr<core::ContentRepository> content_repo) : db_(db), content_repo_(std::move(content_repo)), logger_(core::logging::LoggerFactory<FlowFileLoader>::getLogger()) {} FlowFileLoader::~FlowFileLoader() { stop(); } std::future<FlowFileLoader::FlowFilePtrVec> FlowFileLoader::load(std::vector<SwappedFlowFile> flow_files) { auto promise = std::make_shared<std::promise<FlowFilePtrVec>>(); std::future<FlowFilePtrVec> future = promise->get_future(); utils::Worker<utils::TaskRescheduleInfo> task{[this, flow_files = std::move(flow_files), promise = std::move(promise)] { return loadImpl(flow_files, promise); }, "", // doesn't matter that tasks alias by name, as we never actually query their status or stop a single task std::make_unique<utils::ComplexMonitor>()}; // the dummy_future is for the return value of the Worker's lambda, rerunning this lambda // depends on run_determinant + result // we could create a custom run_determinant to instead determine if/when it should be rerun // based on the lambda's return value (e.g. it could return a nonstd::expected<FlowFilePtrVec, TaskRescheduleInfo>) // but then the std::future would also bear this type std::future<utils::TaskRescheduleInfo> dummy_future; thread_pool_.execute(std::move(task), dummy_future); return future; } void FlowFileLoader::start() { thread_pool_.start(); } void FlowFileLoader::stop() { thread_pool_.shutdown(); } utils::TaskRescheduleInfo FlowFileLoader::loadImpl(const std::vector<SwappedFlowFile>& flow_files, const std::shared_ptr<std::promise<FlowFilePtrVec>>& output) { auto opendb = db_->open(); if (!opendb) { logger_->log_error("Couldn't open database to swap-in flow files"); return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30}); } try { FlowFilePtrVec result; result.reserve(flow_files.size()); rocksdb::ReadOptions read_options; std::vector<utils::SmallString<36>> serialized_keys; serialized_keys.reserve(flow_files.size()); for (const auto& item : flow_files) { serialized_keys.push_back(item.id.to_string()); } std::vector<rocksdb::Slice> keys; keys.reserve(flow_files.size()); for (size_t idx = 0; idx < flow_files.size(); ++idx) { keys.emplace_back(serialized_keys[idx].data(), serialized_keys[idx].length()); } std::vector<std::string> serialized_items; serialized_items.reserve(flow_files.size()); std::vector<rocksdb::Status> statuses = opendb->MultiGet(read_options, keys, &serialized_items); for (size_t idx = 0; idx < statuses.size(); ++idx) { if (!statuses[idx].ok()) { logger_->log_error("Failed to fetch flow file \"%s\"", serialized_keys[idx]); return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{30}); } utils::Identifier container_id; auto flow_file = FlowFileRecord::DeSerialize( gsl::make_span(serialized_items[idx]).as_span<const std::byte>(), content_repo_, container_id); if (!flow_file) { // corrupted flow file logger_->log_error("Failed to deserialize flow file \"%s\"", serialized_keys[idx]); } else { flow_file->setStoredToRepository(true); flow_file->setPenaltyExpiration(flow_files[idx].to_be_processed_after); result.push_back(std::move(flow_file)); logger_->log_debug("Deserialized flow file \"%s\"", serialized_keys[idx]); } } output->set_value(result); return utils::TaskRescheduleInfo::Done(); } catch (const std::exception& err) { logger_->log_error("Error while swapping flow files in: %s", err.what()); return utils::TaskRescheduleInfo::RetryIn(std::chrono::seconds{60}); } } } // namespace org::apache::nifi::minifi