extensions/standard-processors/processors/GetFile.cpp (151 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GetFile.h"
#include <memory>
#include <queue>
#include <string>
#include "ListFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "core/TypedValues.h"
#include "utils/RegexUtils.h"
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
#include "utils/ConfigurationUtils.h"
#include "utils/file/FileReaderCallback.h"
#include "utils/file/FileUtils.h"
#include "utils/ProcessorConfigUtils.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::processors {
void GetFile::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void GetFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration());
request_.batchSize = utils::parseU64Property(context, BatchSize);
request_.ignoreHiddenFile = utils::parseBoolProperty(context, IgnoreHiddenFile);
request_.keepSourceFile = utils::parseBoolProperty(context, KeepSourceFile);
request_.maxAge = utils::parseDurationProperty(context, MaxAge);
request_.minAge = utils::parseDurationProperty(context, MinAge);
request_.maxSize = utils::parseDataSizeProperty(context, MaxSize);
request_.minSize = utils::parseDataSizeProperty(context, MinSize);
request_.pollInterval = utils::parseDurationProperty(context, PollInterval);
request_.recursive = utils::parseBoolProperty(context, Recurse);
request_.fileFilter = utils::parseProperty(context, FileFilter);
if (auto directory_str = context.getProperty(Directory, nullptr)) {
if (!utils::file::is_directory(*directory_str)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Input Directory \"", *directory_str, "\" is not a directory"));
}
request_.inputDirectory = *directory_str;
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property is missing");
}
}
void GetFile::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
const bool is_dir_empty_before_poll = isListingEmpty();
logger_->log_debug("Listing is {} before polling directory", is_dir_empty_before_poll ? "empty" : "not empty");
if (is_dir_empty_before_poll) {
if (request_.pollInterval == 0ms || (std::chrono::system_clock::now() - last_listing_time_.load()) > request_.pollInterval) {
performListing(request_);
last_listing_time_.store(std::chrono::system_clock::now());
}
}
const bool is_dir_empty_after_poll = isListingEmpty();
logger_->log_debug("Listing is {} after polling directory", is_dir_empty_after_poll ? "empty" : "not empty");
if (is_dir_empty_after_poll) {
yield();
return;
}
std::queue<std::filesystem::path> list_of_file_names = pollListing(request_.batchSize);
while (!list_of_file_names.empty()) {
auto file_name = list_of_file_names.front();
list_of_file_names.pop();
getSingleFile(session, file_name);
}
}
void GetFile::getSingleFile(core::ProcessSession& session, const std::filesystem::path& file_path) const {
logger_->log_info("GetFile process {}", file_path);
auto flow_file = session.create();
gsl_Expects(flow_file);
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, file_path.filename().string());
flow_file->setAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, std::filesystem::absolute(file_path.parent_path() / "").string());
auto relative_path = std::filesystem::relative(file_path.parent_path(), request_.inputDirectory);
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (relative_path / "").string());
try {
session.write(flow_file, utils::FileReaderCallback{file_path, buffer_size_});
session.transfer(flow_file, Success);
if (!request_.keepSourceFile) {
std::error_code remove_error;
if (!std::filesystem::remove(file_path, remove_error)) {
logger_->log_error("GetFile could not delete file '{}', error: {}", file_path, remove_error.message());
}
}
} catch (const utils::FileReaderCallbackIOError& io_error) {
logger_->log_error("IO error while processing file '{}': {}", file_path, io_error.what());
flow_file->setDeleted(true);
}
}
bool GetFile::isListingEmpty() const {
std::lock_guard<std::mutex> lock(directory_listing_mutex_);
return directory_listing_.empty();
}
void GetFile::putListing(const std::filesystem::path& file_path) {
logger_->log_trace("Adding file to queue: {}", file_path);
std::lock_guard<std::mutex> lock(directory_listing_mutex_);
directory_listing_.push(file_path);
}
std::queue<std::filesystem::path> GetFile::pollListing(uint64_t batch_size) {
std::lock_guard<std::mutex> lock(directory_listing_mutex_);
std::queue<std::filesystem::path> list;
while (!directory_listing_.empty() && (batch_size == 0 || list.size() < batch_size)) {
list.push(directory_listing_.front());
directory_listing_.pop();
}
return list;
}
bool GetFile::fileMatchesRequestCriteria(const std::filesystem::path& full_name, const std::filesystem::path& name, const GetFileRequest &request) {
logger_->log_trace("Checking file: {}", full_name);
std::error_code ec;
uint64_t file_size = std::filesystem::file_size(full_name, ec);
if (ec) {
logger_->log_error("file_size of {}: {}", full_name, ec.message());
return false;
}
const auto modifiedTime = std::filesystem::last_write_time(full_name, ec);
if (ec) {
logger_->log_error("last_write_time of {}: {}", full_name, ec.message());
return false;
}
if (request.minSize > 0 && file_size < request.minSize)
return false;
if (request.maxSize > 0 && file_size > request.maxSize)
return false;
auto fileAge = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::file_clock::now() - modifiedTime);
if (request.minAge > 0ms && fileAge < request.minAge)
return false;
if (request.maxAge > 0ms && fileAge > request.maxAge)
return false;
if (request.ignoreHiddenFile && utils::file::is_hidden(full_name))
return false;
utils::Regex rgx(request.fileFilter);
if (!utils::regexMatch(name.string(), rgx)) {
return false;
}
auto* const getfile_metrics = dynamic_cast<GetFileMetrics*>(metrics_.get());
gsl_Assert(getfile_metrics);
getfile_metrics->input_bytes += file_size;
++getfile_metrics->accepted_files;
return true;
}
void GetFile::performListing(const GetFileRequest &request) {
auto callback = [this, request](const std::filesystem::path& dir, const std::filesystem::path& filename) -> bool {
auto fullpath = dir / filename;
if (fileMatchesRequestCriteria(fullpath, filename, request)) {
putListing(fullpath);
}
return isRunning();
};
utils::file::list_dir(request.inputDirectory, callback, logger_, request.recursive);
}
REGISTER_RESOURCE(GetFile, Processor);
} // namespace org::apache::nifi::minifi::processors