extensions/standard-processors/processors/GetFile.cpp (151 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GetFile.h" #include <memory> #include <queue> #include <string> #include "ListFile.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" #include "core/TypedValues.h" #include "utils/RegexUtils.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" #include "utils/ConfigurationUtils.h" #include "utils/file/FileReaderCallback.h" #include "utils/file/FileUtils.h" #include "utils/ProcessorConfigUtils.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::processors { void GetFile::initialize() { setSupportedProperties(Properties); setSupportedRelationships(Relationships); } void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); request_.batchSize = utils::parseU64Property(context, BatchSize); request_.ignoreHiddenFile = utils::parseBoolProperty(context, IgnoreHiddenFile); request_.keepSourceFile = utils::parseBoolProperty(context, KeepSourceFile); request_.maxAge = utils::parseDurationProperty(context, MaxAge); request_.minAge = utils::parseDurationProperty(context, MinAge); request_.maxSize = utils::parseDataSizeProperty(context, MaxSize); request_.minSize = utils::parseDataSizeProperty(context, MinSize); request_.pollInterval = utils::parseDurationProperty(context, PollInterval); request_.recursive = utils::parseBoolProperty(context, Recurse); request_.fileFilter = utils::parseProperty(context, FileFilter); if (auto directory_str = context.getProperty(Directory, nullptr)) { if (!utils::file::is_directory(*directory_str)) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a directory")); } request_.inputDirectory = *directory_str; } else { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property is missing"); } } void GetFile::onTrigger(core::ProcessContext&, core::ProcessSession& session) { const bool is_dir_empty_before_poll = isListingEmpty(); logger_->log_debug("Listing is {} before polling directory", is_dir_empty_before_poll ? "empty" : "not empty"); if (is_dir_empty_before_poll) { if (request_.pollInterval == 0ms || (std::chrono::system_clock::now() - last_listing_time_.load()) > request_.pollInterval) { performListing(request_); last_listing_time_.store(std::chrono::system_clock::now()); } } const bool is_dir_empty_after_poll = isListingEmpty(); logger_->log_debug("Listing is {} after polling directory", is_dir_empty_after_poll ? "empty" : "not empty"); if (is_dir_empty_after_poll) { yield(); return; } std::queue<std::filesystem::path> list_of_file_names = pollListing(request_.batchSize); while (!list_of_file_names.empty()) { auto file_name = list_of_file_names.front(); list_of_file_names.pop(); getSingleFile(session, file_name); } } void GetFile::getSingleFile(core::ProcessSession& session, const std::filesystem::path& file_path) const { logger_->log_info("GetFile process {}", file_path); auto flow_file = session.create(); gsl_Expects(flow_file); flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, file_path.filename().string()); flow_file->setAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, std::filesystem::absolute(file_path.parent_path() / "").string()); auto relative_path = std::filesystem::relative(file_path.parent_path(), request_.inputDirectory); flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (relative_path / "").string()); try { session.write(flow_file, utils::FileReaderCallback{file_path, buffer_size_}); session.transfer(flow_file, Success); if (!request_.keepSourceFile) { std::error_code remove_error; if (!std::filesystem::remove(file_path, remove_error)) { logger_->log_error("GetFile could not delete file '{}', error: {}", file_path, remove_error.message()); } } } catch (const utils::FileReaderCallbackIOError& io_error) { logger_->log_error("IO error while processing file '{}': {}", file_path, io_error.what()); flow_file->setDeleted(true); } } bool GetFile::isListingEmpty() const { std::lock_guard<std::mutex> lock(directory_listing_mutex_); return directory_listing_.empty(); } void GetFile::putListing(const std::filesystem::path& file_path) { logger_->log_trace("Adding file to queue: {}", file_path); std::lock_guard<std::mutex> lock(directory_listing_mutex_); directory_listing_.push(file_path); } std::queue<std::filesystem::path> GetFile::pollListing(uint64_t batch_size) { std::lock_guard<std::mutex> lock(directory_listing_mutex_); std::queue<std::filesystem::path> list; while (!directory_listing_.empty() && (batch_size == 0 || list.size() < batch_size)) { list.push(directory_listing_.front()); directory_listing_.pop(); } return list; } bool GetFile::fileMatchesRequestCriteria(const std::filesystem::path& full_name, const std::filesystem::path& name, const GetFileRequest &request) { logger_->log_trace("Checking file: {}", full_name); std::error_code ec; uint64_t file_size = std::filesystem::file_size(full_name, ec); if (ec) { logger_->log_error("file_size of {}: {}", full_name, ec.message()); return false; } const auto modifiedTime = std::filesystem::last_write_time(full_name, ec); if (ec) { logger_->log_error("last_write_time of {}: {}", full_name, ec.message()); return false; } if (request.minSize > 0 && file_size < request.minSize) return false; if (request.maxSize > 0 && file_size > request.maxSize) return false; auto fileAge = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::file_clock::now() - modifiedTime); if (request.minAge > 0ms && fileAge < request.minAge) return false; if (request.maxAge > 0ms && fileAge > request.maxAge) return false; if (request.ignoreHiddenFile && utils::file::is_hidden(full_name)) return false; utils::Regex rgx(request.fileFilter); if (!utils::regexMatch(name.string(), rgx)) { return false; } auto* const getfile_metrics = dynamic_cast<GetFileMetrics*>(metrics_.get()); gsl_Assert(getfile_metrics); getfile_metrics->input_bytes += file_size; ++getfile_metrics->accepted_files; return true; } void GetFile::performListing(const GetFileRequest &request) { auto callback = [this, request](const std::filesystem::path& dir, const std::filesystem::path& filename) -> bool { auto fullpath = dir / filename; if (fileMatchesRequestCriteria(fullpath, filename, request)) { putListing(fullpath); } return isRunning(); }; utils::file::list_dir(request.inputDirectory, callback, logger_, request.recursive); } REGISTER_RESOURCE(GetFile, Processor); } // namespace org::apache::nifi::minifi::processors