extensions/sftp/processors/ListSFTP.h (245 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <array>
#include <memory>
#include <string>
#include <chrono>
#include <cstdint>
#include <unordered_map>
#include <set>
#include <tuple>
#include <vector>
#include <optional>
#include <utility>
#include "SFTPProcessorBase.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Property.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/PropertyType.h"
#include "utils/ArrayUtils.h"
#include "utils/Id.h"
#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::processors {
class ListSFTP : public SFTPProcessorBase {
public:
static constexpr std::string_view LISTING_STRATEGY_TRACKING_TIMESTAMPS = "Tracking Timestamps";
static constexpr std::string_view LISTING_STRATEGY_TRACKING_ENTITIES = "Tracking Entities";
static constexpr std::string_view TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT = "Auto Detect";
static constexpr std::string_view TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS = "Milliseconds";
static constexpr std::string_view TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS = "Seconds";
static constexpr std::string_view TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES = "Minutes";
static constexpr std::string_view ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW = "Tracking Time Window";
static constexpr std::string_view ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE = "All Available";
explicit ListSFTP(std::string name, const utils::Identifier& uuid = {});
~ListSFTP() override;
EXTENSIONAPI static constexpr const char* Description = "Performs a listing of the files residing on an SFTP server. "
"For each file that is found on the remote server, a new FlowFile will be created with "
"the filename attribute set to the name of the file on the remote server. "
"This can then be used in conjunction with FetchSFTP in order to fetch those files.";
EXTENSIONAPI static constexpr auto ListingStrategy = core::PropertyDefinitionBuilder<2>::createProperty("Listing Strategy")
.withDescription("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
.isRequired(true)
.withAllowedValues({LISTING_STRATEGY_TRACKING_TIMESTAMPS, LISTING_STRATEGY_TRACKING_ENTITIES})
.withDefaultValue(LISTING_STRATEGY_TRACKING_TIMESTAMPS)
.build();
EXTENSIONAPI static constexpr auto RemotePath = core::PropertyDefinitionBuilder<>::createProperty("Remote Path")
.withDescription("The fully qualified filename on the remote system")
.isRequired(false)
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto SearchRecursively = core::PropertyDefinitionBuilder<>::createProperty("Search Recursively")
.withDescription("If true, will pull files from arbitrarily nested subdirectories; "
"otherwise, will not traverse subdirectories")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto FollowSymlink = core::PropertyDefinitionBuilder<>::createProperty("Follow symlink")
.withDescription("If true, will pull even symbolic files and also nested symbolic subdirectories; "
"otherwise, will not read symbolic files and will not traverse symbolic link subdirectories")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("false")
.build();
EXTENSIONAPI static constexpr auto FileFilterRegex = core::PropertyDefinitionBuilder<>::createProperty("File Filter Regex")
.withDescription("Provides a Java Regular Expression for filtering Filenames; "
"if a filter is supplied, only files whose names match that Regular Expression will be fetched")
.isRequired(false)
.build();
EXTENSIONAPI static constexpr auto PathFilterRegex = core::PropertyDefinitionBuilder<>::createProperty("Path Filter Regex")
.withDescription("When Search Recursively is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
.isRequired(false)
.build();
EXTENSIONAPI static constexpr auto IgnoreDottedFiles = core::PropertyDefinitionBuilder<>::createProperty("Ignore Dotted Files")
.withDescription("If true, files whose names begin with a dot (\".\") will be ignored")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
.withDefaultValue("true")
.build();
EXTENSIONAPI static constexpr auto TargetSystemTimestampPrecision = core::PropertyDefinitionBuilder<4>::createProperty("Target System Timestamp Precision")
.withDescription("Specify timestamp precision at the target system. "
"Since this processor uses timestamp of entities to decide which should be listed, "
"it is crucial to use the right timestamp precision.")
.isRequired(true)
.withAllowedValues({
TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT,
TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS,
TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS,
TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES})
.withDefaultValue(TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)
.build();
EXTENSIONAPI static constexpr auto EntityTrackingTimeWindow = core::PropertyDefinitionBuilder<>::createProperty("Entity Tracking Time Window")
.withDescription("Specify how long this processor should track already-listed entities. "
"'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. "
"For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. "
"A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: "
"1. does not exist in the already-listed entities, "
"2. has newer timestamp than the cached entity, "
"3. has different size than the cached entity. "
"If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. "
"Used by 'Tracking Entities' strategy.")
.isRequired(false)
.build();
EXTENSIONAPI static constexpr auto EntityTrackingInitialListingTarget = core::PropertyDefinitionBuilder<2>::createProperty("Entity Tracking Initial Listing Target")
.withDescription("Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.")
.withAllowedValues({
ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW,
ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE})
.isRequired(false)
.withDefaultValue(ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
.build();
EXTENSIONAPI static constexpr auto MinimumFileAge = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Age")
.withDescription("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.withDefaultValue("0 sec")
.build();
EXTENSIONAPI static constexpr auto MaximumFileAge = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Age")
.withDescription("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
.isRequired(false)
.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
.build();
EXTENSIONAPI static constexpr auto MinimumFileSize = core::PropertyDefinitionBuilder<>::createProperty("Minimum File Size")
.withDescription("The minimum size that a file must be in order to be pulled")
.isRequired(true)
.withPropertyType(core::StandardPropertyTypes::DATA_SIZE_TYPE)
.withDefaultValue("0 B")
.build();
EXTENSIONAPI static constexpr auto MaximumFileSize = core::PropertyDefinitionBuilder<>::createProperty("Maximum File Size")
.withDescription("The maximum size that a file must be in order to be pulled")
.isRequired(false)
.build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(SFTPProcessorBase::Properties, std::array<core::PropertyReference, 14>{
ListingStrategy,
RemotePath,
SearchRecursively,
FollowSymlink,
FileFilterRegex,
PathFilterRegex,
IgnoreDottedFiles,
TargetSystemTimestampPrecision,
EntityTrackingTimeWindow,
EntityTrackingInitialListingTarget,
MinimumFileAge,
MaximumFileAge,
MinimumFileSize,
MaximumFileSize
});
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All FlowFiles that are received are routed to success"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
// Writes Attributes
static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT = "sftp.remote.port";
static constexpr char const* ATTRIBUTE_SFTP_LISTING_USER = "sftp.listing.user";
static constexpr char const* ATTRIBUTE_FILE_OWNER = "file.owner";
static constexpr char const* ATTRIBUTE_FILE_GROUP = "file.group";
static constexpr char const* ATTRIBUTE_FILE_PERMISSIONS = "file.permissions";
static constexpr char const* ATTRIBUTE_FILE_SIZE = "file.size";
static constexpr char const* ATTRIBUTE_FILE_LASTMODIFIEDTIME = "file.lastModifiedTime";
static constexpr std::array<std::pair<std::string_view, uint64_t>, 2> LISTING_LAG_MAP = {{
{ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS, 1000},
{ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES, 60000},
}};
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
core::StateManager* state_manager_{};
std::string listing_strategy_;
bool search_recursively_{};
bool follow_symlink_{};
std::string file_filter_regex_;
std::string path_filter_regex_;
std::optional<utils::Regex> compiled_file_filter_regex_;
std::optional<utils::Regex> compiled_path_filter_regex_;
bool ignore_dotted_files_{};
std::string target_system_timestamp_precision_;
std::string entity_tracking_initial_listing_target_;
std::chrono::milliseconds minimum_file_age_{};
std::chrono::milliseconds maximum_file_age_{};
uint64_t minimum_file_size_{};
uint64_t maximum_file_size_{};
std::string last_listing_strategy_;
std::string last_hostname_;
std::string last_username_;
std::filesystem::path last_remote_path_;
struct Child {
Child() = default;
Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child);
[[nodiscard]] std::string getPath() const;
bool directory{false};
std::filesystem::path parent_path;
std::filesystem::path filename;
LIBSSH2_SFTP_ATTRIBUTES attrs{};
};
bool already_loaded_from_cache_{};
std::chrono::steady_clock::time_point last_run_time_{};
std::optional<std::chrono::system_clock::time_point> last_listed_latest_entry_timestamp_;
std::optional<std::chrono::system_clock::time_point> last_processed_latest_entry_timestamp_;
std::set<std::string> latest_identifiers_processed_;
bool initial_listing_complete_{};
struct ListedEntity {
uint64_t timestamp;
uint64_t size;
ListedEntity();
ListedEntity(uint64_t timestamp, uint64_t size);
};
std::unordered_map<std::string, ListedEntity> already_listed_entities_;
void invalidateCache();
bool filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child);
bool filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
bool filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
bool createAndTransferFlowFileFromChild(
const std::shared_ptr<core::ProcessSession>& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const Child& child);
bool persistTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
bool updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
bool persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
bool updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
void listByTrackingTimestamps(
const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::vector<Child>&& files);
void listByTrackingEntities(
const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::chrono::milliseconds entity_tracking_time_window,
std::vector<Child>&& files);
};
} // namespace org::apache::nifi::minifi::processors