extensions/sftp/processors/ListSFTP.cpp (725 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ListSFTP.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <deque>
#include <iterator>
#include <limits>
#include <list>
#include <map>
#include <memory>
#include <set>
#include <tuple>
#include <utility>
#include <vector>
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "rapidjson/ostreamwrapper.h"
#include "utils/ConfigurationUtils.h"
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
#include "utils/file/FileUtils.h"
#include "utils/ProcessorConfigUtils.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::processors {
namespace {
uint64_t toUnixTime(const std::optional<std::chrono::system_clock::time_point> time_point) {
if (!time_point)
return 0;
return std::chrono::duration_cast<std::chrono::milliseconds>(time_point->time_since_epoch()).count();
}
std::optional<std::chrono::system_clock::time_point> fromUnixTime(const uint64_t timestamp) {
if (timestamp == 0)
return std::nullopt;
return std::chrono::system_clock::time_point{std::chrono::milliseconds{timestamp}};
}
} // namespace
void ListSFTP::initialize() {
logger_->log_trace("Initializing FetchSFTP");
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
ListSFTP::ListSFTP(std::string_view name, const utils::Identifier& uuid /*= utils::Identifier()*/)
: SFTPProcessorBase(name, uuid) {
logger_ = core::logging::LoggerFactory<ListSFTP>::getLogger(uuid_);
}
ListSFTP::~ListSFTP() = default;
void ListSFTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
parseCommonPropertiesOnSchedule(context);
state_manager_ = context.getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
listing_strategy_ = utils::parseProperty(context, ListingStrategy);
if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
invalidateCache();
}
last_listing_strategy_ = listing_strategy_;
search_recursively_ = utils::parseBoolProperty(context, SearchRecursively);
follow_symlink_ = utils::parseBoolProperty(context, FollowSymlink);
ignore_dotted_files_ = utils::parseBoolProperty(context, IgnoreDottedFiles);
file_filter_regex_ = context.getProperty(FileFilterRegex).value_or("");
if (!file_filter_regex_.empty()) {
try {
compiled_file_filter_regex_ = utils::Regex(file_filter_regex_);
} catch (const Exception&) {
logger_->log_error("Failed to compile File Filter Regex \"{}\"", file_filter_regex_.c_str());
}
}
path_filter_regex_ = context.getProperty(PathFilterRegex).value_or("");
if (!path_filter_regex_.empty()) {
try {
compiled_path_filter_regex_ = utils::Regex(path_filter_regex_);
} catch (const Exception&) {
logger_->log_error("Failed to compile Path Filter Regex \"{}\"", path_filter_regex_.c_str());
}
}
target_system_timestamp_precision_ = utils::parseProperty(context, TargetSystemTimestampPrecision);
entity_tracking_initial_listing_target_ = utils::parseProperty(context, EntityTrackingInitialListingTarget);
minimum_file_age_ = utils::parseDurationProperty(context, MinimumFileAge);
maximum_file_age_ = utils::parseOptionalDurationProperty(context, MaximumFileAge);
minimum_file_size_ = utils::parseDataSizeProperty(context, MinimumFileSize);
maximum_file_size_ = utils::parseOptionalDataSizeProperty(context, MaximumFileSize);
startKeepaliveThreadIfNeeded();
}
void ListSFTP::invalidateCache() {
logger_->log_warn("Important properties have been reconfigured, invalidating in-memory cache");
already_loaded_from_cache_ = false;
last_run_time_ = std::chrono::steady_clock::time_point();
last_listed_latest_entry_timestamp_.reset();
last_processed_latest_entry_timestamp_.reset();
latest_identifiers_processed_.clear();
initial_listing_complete_ = false;
already_listed_entities_.clear();
}
ListSFTP::Child::Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child)
: parent_path(parent_path_) {
std::tie(filename, std::ignore, attrs) = std::move(sftp_child);
directory = LIBSSH2_SFTP_S_ISDIR(attrs.permissions);
}
std::string ListSFTP::Child::getPath() const {
return (parent_path / filename).generic_string();
}
bool ListSFTP::filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child) {
const std::string& filename = std::get<0>(sftp_child);
const LIBSSH2_SFTP_ATTRIBUTES& attrs = std::get<2>(sftp_child);
/* This should not happen */
if (filename.empty()) {
logger_->log_error("Listing directory \"{}\" returned an empty child", parent_path.c_str());
return false;
}
/* Ignore current dir and parent dir */
if (filename == "." || filename == "..") {
return false;
}
/* Dotted files */
if (ignore_dotted_files_ && filename[0] == '.') {
logger_->log_debug("Ignoring \"{}/{}\" because Ignore Dotted Files is true", parent_path.c_str(), filename.c_str());
return false;
}
if (!(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS)) {
// TODO(Bakai): maybe do a fallback stat here
logger_->log_error("Failed to get permissions in stat for \"{}/{}\"", parent_path.c_str(), filename.c_str());
return false;
}
if (LIBSSH2_SFTP_S_ISREG(attrs.permissions)) {
return filterFile(parent_path, filename, attrs);
} else if (LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
return filterDirectory(parent_path, filename, attrs);
} else {
logger_->log_debug("Skipping non-regular, non-directory file \"{}/{}\"", parent_path.c_str(), filename.c_str());
return false;
}
}
bool ListSFTP::filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
if (!(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) ||
!(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) ||
!(attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME)) {
// TODO(Bakai): maybe do a fallback stat here
logger_->log_error("Failed to get all attributes in stat for \"{}/{}\"", parent_path.c_str(), filename.c_str());
return false;
}
/* Age */
auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - std::chrono::system_clock::from_time_t(gsl::narrow<time_t>(attrs.mtime)));
if (file_age < minimum_file_age_) {
logger_->log_debug("Ignoring \"{}/{}\" because it is younger than the Minimum File Age: {} < {}",
parent_path.c_str(),
filename.c_str(),
file_age,
minimum_file_age_);
return false;
}
if (maximum_file_age_ && file_age > *maximum_file_age_) {
logger_->log_debug("Ignoring \"{}/{}\" because it is older than the Maximum File Age: {} > {}",
parent_path.c_str(),
filename.c_str(),
file_age,
*maximum_file_age_);
return false;
}
/* Size */
if (attrs.filesize < minimum_file_size_) {
logger_->log_debug("Ignoring \"{}/{}\" because it is smaller than the Minimum File Size: {} B < {} B",
parent_path.c_str(),
filename.c_str(),
attrs.filesize,
minimum_file_size_);
return false;
}
if (maximum_file_size_ && attrs.filesize > *maximum_file_size_) {
logger_->log_debug("Ignoring \"{}/{}\" because it is larger than the Maximum File Size: {} B > {} B",
parent_path.c_str(),
filename.c_str(),
attrs.filesize,
*maximum_file_size_);
return false;
}
/* File Filter Regex */
if (compiled_file_filter_regex_) {
bool match = false;
match = utils::regexMatch(filename, *compiled_file_filter_regex_);
if (!match) {
logger_->log_debug(R"(Ignoring "{}/{}" because it did not match the File Filter Regex "{}")",
parent_path.c_str(),
filename.c_str(),
file_filter_regex_);
return false;
}
}
return true;
}
bool ListSFTP::filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& /*attrs*/) {
if (!search_recursively_) {
return false;
}
/* Path Filter Regex */
if (compiled_path_filter_regex_) {
std::string dir_path = utils::string::join_pack(parent_path, "/", filename);
bool match = false;
match = utils::regexMatch(dir_path, *compiled_path_filter_regex_);
if (!match) {
logger_->log_debug(R"(Not recursing into "{}" because it did not match the Path Filter Regex "{}")",
dir_path.c_str(),
path_filter_regex_);
return false;
}
}
return true;
}
bool ListSFTP::createAndTransferFlowFileFromChild(
core::ProcessSession& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const ListSFTP::Child& child) {
/* Convert mtime to string */
if (child.attrs.mtime > gsl::narrow<uint64_t>(std::numeric_limits<int64_t>::max())) {
logger_->log_error("Modification date {} of \"{}/{}\" larger than int64_t max", child.attrs.mtime, child.parent_path.string(), child.filename);
return true;
}
auto mtime_str = utils::timeutils::getDateTimeStr(date::sys_seconds{std::chrono::seconds(child.attrs.mtime)});
/* Create FlowFile */
auto flow_file = session.create();
if (flow_file == nullptr) {
logger_->log_error("Failed to create FlowFileRecord");
return false;
}
/* Set attributes */
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, hostname);
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(port));
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_LISTING_USER, username);
/* uid and gid */
session.putAttribute(*flow_file, ATTRIBUTE_FILE_OWNER, std::to_string(child.attrs.uid));
session.putAttribute(*flow_file, ATTRIBUTE_FILE_GROUP, std::to_string(child.attrs.gid));
/* permissions */
std::stringstream ss;
ss << std::setfill('0') << std::setw(4) << std::oct << (child.attrs.permissions & 0777);
session.putAttribute(*flow_file, ATTRIBUTE_FILE_PERMISSIONS, ss.str());
/* filesize */
session.putAttribute(*flow_file, ATTRIBUTE_FILE_SIZE, std::to_string(child.attrs.filesize));
/* mtime */
session.putAttribute(*flow_file, ATTRIBUTE_FILE_LASTMODIFIEDTIME, mtime_str);
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, child.filename.generic_string());
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, child.parent_path.generic_string());
session.transfer(flow_file, Success);
return true;
}
ListSFTP::ListedEntity::ListedEntity()
: timestamp(0U)
, size(0U) {
}
ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
: timestamp(timestamp_)
, size(size_) {
}
bool ListSFTP::persistTrackingTimestampsCache(core::ProcessContext& /*context*/, const std::string& hostname, const std::string& username, const std::string& remote_path) {
std::unordered_map<std::string, std::string> state;
state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
state["hostname"] = hostname;
state["username"] = username;
state["remote_path"] = remote_path;
state["listing.timestamp"] = std::to_string(toUnixTime(last_listed_latest_entry_timestamp_));
state["processed.timestamp"] = std::to_string(toUnixTime(last_processed_latest_entry_timestamp_));
size_t i = 0;
for (const auto& identifier : latest_identifiers_processed_) {
state["id." + std::to_string(i)] = identifier;
++i;
}
return state_manager_->set(state);
}
bool ListSFTP::updateFromTrackingTimestampsCache(core::ProcessContext& /*context*/, const std::string& hostname, const std::string& username, const std::string& remote_path) {
std::string state_listing_strategy;
std::string state_hostname;
std::string state_username;
std::string state_remote_path;
uint64_t state_listing_timestamp = 0;
uint64_t state_processed_timestamp = 0;
std::set<std::string> state_ids;
std::unordered_map<std::string, std::string> state_map;
if (!state_manager_->get(state_map)) {
logger_->log_info("Found no stored state");
return false;
}
try {
state_listing_strategy = state_map.at("listing_strategy");
} catch (...) {
logger_->log_error("listing_strategy is missing from state");
return false;
}
try {
state_hostname = state_map.at("hostname");
} catch (...) {
logger_->log_error("hostname is missing from state");
return false;
}
try {
state_username = state_map.at("username");
} catch (...) {
logger_->log_error("username is missing from state");
return false;
}
try {
state_remote_path = state_map.at("remote_path");
} catch (...) {
logger_->log_error("remote_path is missing from state");
return false;
}
try {
state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
} catch (...) {
logger_->log_error("listing.timestamp is missing from state or is invalid");
return false;
}
try {
state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
} catch (...) {
logger_->log_error("processed.timestamp is missing from state or is invalid");
return false;
}
for (const auto &kv : state_map) {
if (kv.first.compare(0, strlen("id."), "id.") == 0) {
state_ids.emplace(kv.second);
}
}
if (state_listing_strategy != listing_strategy_ ||
state_hostname != hostname ||
state_username != username ||
state_remote_path != remote_path) {
logger_->log_error(
"Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"{}\" vs. \"{}\", "
"Hostname: \"{}\" vs. \"{}\", "
"Username: \"{}\" vs. \"{}\", "
"Remote Path: \"{}\" vs. \"{}\"",
state_listing_strategy, listing_strategy_,
state_hostname, hostname,
state_username, username,
state_remote_path, remote_path);
return false;
}
last_listed_latest_entry_timestamp_ = fromUnixTime(state_listing_timestamp);
last_processed_latest_entry_timestamp_ = fromUnixTime(state_processed_timestamp);
latest_identifiers_processed_ = std::move(state_ids);
return true;
}
void ListSFTP::listByTrackingTimestamps(
core::ProcessContext& context,
core::ProcessSession& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::vector<Child>&& files) {
auto min_timestamp_to_list = last_listed_latest_entry_timestamp_;
/* Load state from cache file if needed */
if (!already_loaded_from_cache_) {
if (updateFromTrackingTimestampsCache(context, hostname, username, remote_path)) {
logger_->log_debug("Successfully loaded state");
} else {
logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
std::chrono::steady_clock::time_point current_run_time = std::chrono::steady_clock::now();
auto now = std::chrono::system_clock::now();
/* Order children by timestamp and try to detect timestamp precision if needed */
std::map<std::chrono::system_clock::time_point, std::list<Child>> ordered_files;
bool target_system_has_seconds = false;
for (auto&& file : files) {
std::chrono::system_clock::time_point timestamp{std::chrono::seconds(file.attrs.mtime)};
target_system_has_seconds |= std::chrono::round<std::chrono::minutes>(timestamp) != timestamp;
bool new_file = !min_timestamp_to_list.has_value() || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
if (new_file) {
auto& files_for_timestamp = ordered_files[timestamp];
files_for_timestamp.emplace_back(std::move(file));
} else {
logger_->log_trace("Skipping \"{}\", because it is not new.", file.getPath().c_str());
}
}
std::optional<std::chrono::system_clock::time_point> latest_listed_entry_timestamp_this_cycle;
size_t flow_files_created = 0U;
if (!ordered_files.empty()) {
latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;
std::string remote_system_timestamp_precision;
if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
if (target_system_has_seconds) {
logger_->log_debug("Precision auto detection detected second precision");
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
} else {
logger_->log_debug("Precision auto detection detected minute precision");
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
}
} else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
} else {
/*
* We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
* so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
*/
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
}
std::chrono::milliseconds listing_lag{utils::at(LISTING_LAG_MAP, remote_system_timestamp_precision)};
logger_->log_debug("The listing lag is {}", listing_lag);
/* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_ && latest_listed_entry_timestamp_this_cycle) {
const auto& latest_files = ordered_files.at(*latest_listed_entry_timestamp_this_cycle);
auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_);
/* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
if (elapsed_time < listing_lag) {
logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp ({}) "
"and the listing lag has not yet elapsed ({} < {}). Yielding.",
latest_listed_entry_timestamp_this_cycle, elapsed_time, listing_lag);
context.yield();
return;
}
/*
* If we have already processed the entities with the newest timestamp,
* and there are no new entities with that timestamp, there is nothing to do.
*/
if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
return latest_identifiers_processed_.count(child.getPath()) == 1U;
})) {
logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp ({}) "
"and all files for that timestamp has been processed. Yielding.", latest_listed_entry_timestamp_this_cycle);
context.yield();
return;
}
} else {
/* Determine the minimum reliable timestamp based on precision */
auto minimum_reliable_timestamp = now - listing_lag;
if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
minimum_reliable_timestamp = std::chrono::floor<std::chrono::seconds>(minimum_reliable_timestamp);
} else {
minimum_reliable_timestamp = std::chrono::floor<std::chrono::minutes>(minimum_reliable_timestamp);
}
/* If the latest timestamp is not old enough, we wait another cycle */
if (latest_listed_entry_timestamp_this_cycle && minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: {} >= {}",
latest_listed_entry_timestamp_this_cycle,
minimum_reliable_timestamp);
ordered_files.erase(*latest_listed_entry_timestamp_this_cycle);
}
}
for (auto& files_for_timestamp : ordered_files) {
if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
/* Filter out previously processed entities. */
for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
if (latest_identifiers_processed_.contains(it->getPath())) {
it = files_for_timestamp.second.erase(it);
} else {
++it;
}
}
}
for (const auto& file : files_for_timestamp.second) {
/* Create the FlowFile for this path */
if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
flow_files_created++;
} else {
logger_->log_error("Failed to emit FlowFile for \"{}\"", file.filename.generic_string());
context.yield();
return;
}
}
}
}
/* If we have a listing timestamp, it is worth persisting the state */
if (latest_listed_entry_timestamp_this_cycle) {
bool processed_new_files = flow_files_created > 0U;
if (processed_new_files) {
auto last_files_it = ordered_files.crbegin();
if (last_files_it->first != last_processed_latest_entry_timestamp_) {
latest_identifiers_processed_.clear();
}
for (const auto& last_file : last_files_it->second) {
latest_identifiers_processed_.insert(last_file.getPath());
}
last_processed_latest_entry_timestamp_ = last_files_it->first;
}
last_run_time_ = current_run_time;
if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
persistTrackingTimestampsCache(context, hostname, username, remote_path);
}
} else {
logger_->log_debug("There are no files to list. Yielding.");
context.yield();
return;
}
}
bool ListSFTP::persistTrackingEntitiesCache(core::ProcessContext& /*context*/, const std::string& hostname, const std::string& username, const std::string& remote_path) {
std::unordered_map<std::string, std::string> state;
state["listing_strategy"] = listing_strategy_;
state["hostname"] = hostname;
state["username"] = username;
state["remote_path"] = remote_path;
size_t i = 0;
for (const auto &already_listed_entity : already_listed_entities_) {
state["entity." + std::to_string(i) + ".name"] = already_listed_entity.first;
state["entity." + std::to_string(i) + ".timestamp"] = std::to_string(already_listed_entity.second.timestamp);
state["entity." + std::to_string(i) + ".size"] = std::to_string(already_listed_entity.second.size);
++i;
}
return state_manager_->set(state);
}
bool ListSFTP::updateFromTrackingEntitiesCache(core::ProcessContext& /*context*/, const std::string& hostname, const std::string& username, const std::string& remote_path) {
std::string state_listing_strategy;
std::string state_hostname;
std::string state_username;
std::string state_remote_path;
std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
std::unordered_map<std::string, std::string> state_map;
if (!state_manager_->get(state_map)) {
logger_->log_debug("Failed to get state from StateManager");
return false;
}
try {
state_listing_strategy = state_map.at("listing_strategy");
} catch (...) {
logger_->log_error("listing_strategy is missing from state");
return false;
}
try {
state_hostname = state_map.at("hostname");
} catch (...) {
logger_->log_error("hostname is missing from state");
return false;
}
try {
state_username = state_map.at("username");
} catch (...) {
logger_->log_error("username is missing from state");
return false;
}
try {
state_remote_path = state_map.at("remote_path");
} catch (...) {
logger_->log_error("remote_path is missing from state");
return false;
}
for (size_t i = 0U;; i++) {
std::string name;
try {
name = state_map.at("entity." + std::to_string(i) + ".name");
} catch (...) {
break;
}
try {
uint64_t timestamp = std::stoull(state_map.at("entity." + std::to_string(i) + ".timestamp"));
uint64_t size = std::stoull(state_map.at("entity." + std::to_string(i) + ".size"));
new_already_listed_entities.emplace(std::piecewise_construct,
std::forward_as_tuple(name),
std::forward_as_tuple(timestamp, size));
} catch (...) {
logger_->log_error("State for entity \"{}\" is missing or invalid, skipping", name);
continue;
}
}
if (state_listing_strategy != listing_strategy_ ||
state_hostname != hostname ||
state_username != username ||
state_remote_path != remote_path) {
logger_->log_error(
"Processor state was persisted with different settings than the current ones, ignoring. "
"Listing Strategy: \"{}\" vs. \"{}\", "
"Hostname: \"{}\" vs. \"{}\", "
"Username: \"{}\" vs. \"{}\", "
"Remote Path: \"{}\" vs. \"{}\"",
state_listing_strategy, listing_strategy_,
state_hostname, hostname,
state_username, username,
state_remote_path, remote_path);
return false;
}
already_listed_entities_ = std::move(new_already_listed_entities);
return true;
}
void ListSFTP::listByTrackingEntities(
core::ProcessContext& context,
core::ProcessSession& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::chrono::milliseconds entity_tracking_time_window,
std::vector<Child>&& files) {
/* Load state from cache file if needed */
if (!already_loaded_from_cache_) {
if (updateFromTrackingEntitiesCache(context, hostname, username, remote_path)) {
logger_->log_debug("Successfully loaded state");
initial_listing_complete_ = true;
} else {
logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
time_t now = time(nullptr);
uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
? 0U : (now * 1000 - entity_tracking_time_window.count());
/* Skip files not in the tracking window */
for (auto it = files.begin(); it != files.end(); ) {
if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
logger_->log_trace("Skipping \"{}\" because it has an older timestamp than the minimum timestamp to list: {} < {}",
it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
it = files.erase(it);
} else {
++it;
}
}
if (files.empty()) {
logger_->log_debug("No entities to list within the tracking time window");
context.yield();
return;
}
/* Find files that have been updated */
std::vector<Child> updated_entities;
std::copy_if(std::make_move_iterator(files.begin()),
std::make_move_iterator(files.end()),
std::back_inserter(updated_entities),
[&](const Child& child) {
auto already_listed_it = already_listed_entities_.find(child.getPath());
if (already_listed_it == already_listed_entities_.end()) {
logger_->log_trace("Found new file \"{}\"", child.getPath());
return true;
}
if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
logger_->log_trace("Found file \"{}\" with newer timestamp: {} -> {}",
child.getPath(),
already_listed_it->second.timestamp,
child.attrs.mtime * 1000);
return true;
}
if (child.attrs.filesize != already_listed_it->second.size) {
logger_->log_trace("Found file \"{}\" with different size: {} -> {}",
child.getPath(),
already_listed_it->second.size,
child.attrs.filesize);
return true;
}
logger_->log_trace("Skipping file \"{}\" because it has not changed", child.getPath());
return false;
});
/* Find entities in the tracking cache that are no longer in the tracking window */
std::vector<std::string> old_entity_ids;
for (const auto& already_listed_entity : already_listed_entities_) {
if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
old_entity_ids.emplace_back(already_listed_entity.first);
}
}
/* If we have no new files and no expired tracked entities, we have nothing to do */
if (updated_entities.empty() && old_entity_ids.empty()) {
context.yield();
return;
}
/* Remove expired entities */
for (const auto& old_entity_id : old_entity_ids) {
already_listed_entities_.erase(old_entity_id);
}
for (const auto& updated_entity : updated_entities) {
/* Create the FlowFile for this path */
if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
logger_->log_error("Failed to emit FlowFile for \"{}\"", updated_entity.getPath());
context.yield();
return;
}
already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
}
initial_listing_complete_ = true;
persistTrackingEntitiesCache(context, hostname, username, remote_path);
}
void ListSFTP::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
context.yield();
return;
}
std::string remote_path_str = context.getProperty(RemotePath).value_or("");
/* Remove trailing slashes */
while (remote_path_str.size() > 1 && remote_path_str.ends_with('/')) {
remote_path_str.pop_back();
}
std::filesystem::path remote_path{remote_path_str, std::filesystem::path::format::generic_format};
std::chrono::milliseconds entity_tracking_time_window = 3h; /* The default is 3 hours */
if (const auto entity_tracking_time_window_str = context.getProperty(EntityTrackingTimeWindow)) {
if (auto parsed_entity_time_window = utils::timeutils::StringToDuration<std::chrono::milliseconds>(*entity_tracking_time_window_str)) {
entity_tracking_time_window = parsed_entity_time_window.value();
} else {
logger_->log_error("Entity Tracking Time Window attribute is invalid");
}
}
/* Check whether we need to invalidate the cache based on the new properties */
if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
(!last_username_.empty() && last_username_ != common_properties.username) ||
(!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
invalidateCache();
}
last_hostname_ = common_properties.hostname;
last_username_ = common_properties.username;
last_remote_path_ = remote_path;
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration());
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password,
buffer_size);
if (client == nullptr) {
context.yield();
return;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
std::deque<Child> directories;
std::vector<Child> files;
/* Add initial directory */
Child root;
root.parent_path = remote_path.parent_path();
root.filename = remote_path.filename();
root.directory = true;
directories.emplace_back(std::move(root));
/* Process directories */
while (!directories.empty()) {
auto directory = std::move(directories.front());
directories.pop_front();
std::string new_parent_path;
if (directory.parent_path.empty()) {
new_parent_path = directory.filename.generic_string();
} else {
new_parent_path = directory.getPath();
}
std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
continue;
}
for (auto&& dir_child : dir_children) {
if (filter(new_parent_path, dir_child)) {
Child child(new_parent_path, std::move(dir_child));
if (child.directory) {
directories.emplace_back(std::move(child));
} else {
files.emplace_back(std::move(child));
}
}
}
}
/* Process the files with the appropriate tracking strategy */
if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path.generic_string(), std::move(files));
} else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
listByTrackingEntities(context, session, common_properties.hostname, common_properties.port,
common_properties.username, remote_path.generic_string(), entity_tracking_time_window, std::move(files));
} else {
logger_->log_error("Unknown Listing Strategy: \"{}\"", listing_strategy_.c_str());
context.yield();
return;
}
put_connection_back_to_cache();
}
REGISTER_RESOURCE(ListSFTP, Processor);
} // namespace org::apache::nifi::minifi::processors