extensions/standard-processors/processors/GetFile.cpp (165 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GetFile.h" #include <queue> #include <memory> #include <string> #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "utils/TimeUtil.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/TypedValues.h" #include "utils/FileReaderCallback.h" #include "utils/RegexUtils.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::processors { void GetFile::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) { std::string value; if (context->getProperty(BatchSize, value)) { core::Property::StringToInt(value, request_.batchSize); } if (context->getProperty(IgnoreHiddenFile, value)) { request_.ignoreHiddenFile = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(true); } if (context->getProperty(KeepSourceFile, value)) { request_.keepSourceFile = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); } if (auto max_age = context->getProperty<core::TimePeriodValue>(MaxAge)) request_.maxAge = max_age->getMilliseconds(); if (auto min_age = context->getProperty<core::TimePeriodValue>(MinAge)) request_.minAge = min_age->getMilliseconds(); if (context->getProperty(MaxSize, value)) { core::Property::StringToInt(value, request_.maxSize); } if (context->getProperty(MinSize, value)) { core::Property::StringToInt(value, request_.minSize); } if (const auto poll_interval = context->getProperty<core::TimePeriodValue>(PollInterval)) { request_.pollInterval = poll_interval->getMilliseconds(); } if (context->getProperty(Recurse, value)) { request_.recursive = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(true); } if (context->getProperty(FileFilter, value)) { request_.fileFilter = value; } if (auto directory_str = context->getProperty(Directory)) { if (!utils::file::is_directory(*directory_str)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory \"" + value + "\" is not a directory"); } request_.inputDirectory = *directory_str; } else { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property is missing"); } } void GetFile::onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* session) { const bool is_dir_empty_before_poll = isListingEmpty(); logger_->log_debug("Listing is %s before polling directory", is_dir_empty_before_poll ? "empty" : "not empty"); if (is_dir_empty_before_poll) { if (request_.pollInterval == 0ms || (std::chrono::system_clock::now() - last_listing_time_.load()) > request_.pollInterval) { performListing(request_); last_listing_time_.store(std::chrono::system_clock::now()); } } const bool is_dir_empty_after_poll = isListingEmpty(); logger_->log_debug("Listing is %s after polling directory", is_dir_empty_after_poll ? "empty" : "not empty"); if (is_dir_empty_after_poll) { yield(); return; } std::queue<std::filesystem::path> list_of_file_names = pollListing(request_.batchSize); while (!list_of_file_names.empty()) { auto file_name = list_of_file_names.front(); list_of_file_names.pop(); getSingleFile(*session, file_name); } } void GetFile::getSingleFile(core::ProcessSession& session, const std::filesystem::path& file_path) const { logger_->log_info("GetFile process %s", file_path.string()); auto flow_file = session.create(); gsl_Expects(flow_file); flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, file_path.filename().string()); flow_file->setAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, std::filesystem::absolute(file_path.parent_path() / "").string()); auto relative_path = std::filesystem::relative(file_path.parent_path(), request_.inputDirectory); flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (relative_path / "").string()); try { session.write(flow_file, utils::FileReaderCallback{file_path}); session.transfer(flow_file, Success); if (!request_.keepSourceFile) { std::error_code remove_error; if (!std::filesystem::remove(file_path, remove_error)) { logger_->log_error("GetFile could not delete file '%s', error: %s", file_path.string(), remove_error.message()); } } } catch (const utils::FileReaderCallbackIOError& io_error) { logger_->log_error("IO error while processing file '%s': %s", file_path.string(), io_error.what()); flow_file->setDeleted(true); } } bool GetFile::isListingEmpty() const { std::lock_guard<std::mutex> lock(directory_listing_mutex_); return directory_listing_.empty(); } void GetFile::putListing(const std::filesystem::path& file_path) { logger_->log_trace("Adding file to queue: %s", file_path.string()); std::lock_guard<std::mutex> lock(directory_listing_mutex_); directory_listing_.push(file_path); } std::queue<std::filesystem::path> GetFile::pollListing(uint64_t batch_size) { std::lock_guard<std::mutex> lock(directory_listing_mutex_); std::queue<std::filesystem::path> list; while (!directory_listing_.empty() && (batch_size == 0 || list.size() < batch_size)) { list.push(directory_listing_.front()); directory_listing_.pop(); } return list; } bool GetFile::fileMatchesRequestCriteria(const std::filesystem::path& full_name, const std::filesystem::path& name, const GetFileRequest &request) { logger_->log_trace("Checking file: %s", full_name.string()); std::error_code ec; uint64_t file_size = std::filesystem::file_size(full_name, ec); if (ec) { logger_->log_error("file_size of %s: %s", full_name.string(), ec.message()); return false; } const auto modifiedTime = std::filesystem::last_write_time(full_name, ec); if (ec) { logger_->log_error("last_write_time of %s: %s", full_name.string(), ec.message()); return false; } if (request.minSize > 0 && file_size < request.minSize) return false; if (request.maxSize > 0 && file_size > request.maxSize) return false; auto fileAge = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::file_clock::now() - modifiedTime); if (request.minAge > 0ms && fileAge < request.minAge) return false; if (request.maxAge > 0ms && fileAge > request.maxAge) return false; if (request.ignoreHiddenFile && utils::file::is_hidden(full_name)) return false; utils::Regex rgx(request.fileFilter); if (!utils::regexMatch(name.string(), rgx)) { return false; } auto* const getfile_metrics = static_cast<GetFileMetrics*>(metrics_.get()); getfile_metrics->input_bytes += file_size; ++getfile_metrics->accepted_files; return true; } void GetFile::performListing(const GetFileRequest &request) { auto callback = [this, request](const std::filesystem::path& dir, const std::filesystem::path& filename) -> bool { auto fullpath = dir / filename; if (fileMatchesRequestCriteria(fullpath, filename, request)) { putListing(fullpath); } return isRunning(); }; utils::file::list_dir(request.inputDirectory, callback, logger_, request.recursive); } REGISTER_RESOURCE(GetFile, Processor); } // namespace org::apache::nifi::minifi::processors