extensions/standard-processors/processors/GetFile.h (158 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <queue>
#include <string>
#include <vector>
#include <atomic>
#include <utility>
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/RelationshipDefinition.h"
#include "core/Core.h"
#include "core/logging/LoggerFactory.h"
#include "utils/Export.h"
#include "core/ProcessorMetrics.h"
namespace org::apache::nifi::minifi::processors {
struct GetFileRequest {
bool recursive = true;
bool keepSourceFile = false;
std::chrono::milliseconds minAge{0};
std::chrono::milliseconds maxAge{0};
uint64_t minSize = 0;
uint64_t maxSize = 0;
bool ignoreHiddenFile = true;
std::chrono::milliseconds pollInterval{0};
uint64_t batchSize = 10;
std::string fileFilter = ".*";
std::filesystem::path inputDirectory;
};
class GetFileMetrics : public core::ProcessorMetricsImpl {
public:
explicit GetFileMetrics(const core::Processor& source_processor)
: core::ProcessorMetricsImpl(source_processor) {
}
std::vector<state::response::SerializedResponseNode> serialize() override {
auto resp = core::ProcessorMetricsImpl::serialize();
auto& root_node = resp[0];
state::response::SerializedResponseNode accepted_files_node{"AcceptedFiles", accepted_files.load()};
root_node.children.push_back(accepted_files_node);
state::response::SerializedResponseNode input_bytes_node{"InputBytes", input_bytes.load()};
root_node.children.push_back(input_bytes_node);
return resp;
}
std::vector<state::PublishedMetric> calculateMetrics() override {
auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
metrics.push_back({"accepted_files", static_cast<double>(accepted_files.load()), getCommonLabels()});
metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()), getCommonLabels()});
return metrics;
}
std::atomic<uint32_t> accepted_files{0};
std::atomic<uint64_t> input_bytes{0};
};
class GetFile : public core::ProcessorImpl {
public:
explicit GetFile(const std::string_view name, const utils::Identifier& uuid = {})
: ProcessorImpl(name, uuid) {
metrics_ = gsl::make_not_null(std::make_shared<GetFileMetrics>(*this));
logger_ = core::logging::LoggerFactory<GetFile>::getLogger(uuid_);
}
~GetFile() override = default;
EXTENSIONAPI static constexpr const char* Description = "Creates FlowFiles from files in a directory. MiNiFi will ignore files for which it doesn't have read permissions.";
EXTENSIONAPI static constexpr auto Directory = core::PropertyDefinitionBuilder<>::createProperty("Input Directory")
.withDescription("The input directory from which to pull files")
.isRequired(true)
.supportsExpressionLanguage(true)
.withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
.build();
EXTENSIONAPI static constexpr auto Recurse = core::PropertyDefinitionBuilder<>::createProperty("Recurse Subdirectories")
.withDescription("Indicates whether or not to pull files from subdirectories")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto KeepSourceFile = core::PropertyDefinitionBuilder<>::createProperty("Keep Source File")
.withDescription("If true, the file is not deleted after it has been copied to the Content Repository")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto MinAge = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Age")
.withDescription("The minimum age that a file must be in order to be pulled;"
" any file younger than this amount of time (according to last modification date) will be ignored")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("0 sec")
.build();
EXTENSIONAPI static constexpr auto MaxAge = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Age")
.withDescription("The maximum age that a file must be in order to be pulled;"
" any file older than this amount of time (according to last modification date) will be ignored")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("0 sec")
.build();
EXTENSIONAPI static constexpr auto MinSize = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Size")
.withDescription("The minimum size that a file can be in order to be pulled")
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto MaxSize = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Size")
.withDescription("The maximum size that a file can be in order to be pulled")
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto IgnoreHiddenFile = core::PropertyDefinitionBuilder<>::createProperty("Ignore Hidden Files")
.withDescription("Indicates whether or not hidden files should be ignored")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto PollInterval = core::PropertyDefinitionBuilder<>::createProperty("Polling Interval")
.withDescription("Indicates how long to wait before performing a directory listing")
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("0 sec")
.build();
EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size")
.withDescription("The maximum number of files to pull in each iteration")
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.withDefaultValue("10")
.build();
EXTENSIONAPI static constexpr auto FileFilter = core::PropertyDefinitionBuilder<>::createProperty("File Filter")
.withDescription("Only files whose names match the given regular expression will be picked up")
.withDefaultValue(".*")
.build();
EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
Directory,
Recurse,
KeepSourceFile,
MinAge,
MaxAge,
MinSize,
MaxSize,
IgnoreHiddenFile,
PollInterval,
BatchSize,
FileFilter
});
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All files are routed to success"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
/**
* performs a listing on the directory.
* @param request get file request.
*/
void performListing(const GetFileRequest &request);
private:
bool isListingEmpty() const;
void putListing(const std::filesystem::path& file_path);
std::queue<std::filesystem::path> pollListing(uint64_t batch_size);
bool fileMatchesRequestCriteria(const std::filesystem::path& full_name, const std::filesystem::path& name, const GetFileRequest &request);
void getSingleFile(core::ProcessSession& session, const std::filesystem::path& file_path) const;
GetFileRequest request_;
std::queue<std::filesystem::path> directory_listing_;
mutable std::mutex directory_listing_mutex_;
std::atomic<std::chrono::time_point<std::chrono::system_clock>> last_listing_time_{};
size_t buffer_size_{};
};
} // namespace org::apache::nifi::minifi::processors