in extensions/sftp/processors/ListSFTP.cpp [806:918]
void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
context->yield();
return;
}
std::string remote_path_str;
context->getProperty(RemotePath, remote_path_str);
/* Remove trailing slashes */
while (remote_path_str.size() > 1 && remote_path_str.ends_with('/')) {
remote_path_str.pop_back();
}
std::filesystem::path remote_path{remote_path_str, std::filesystem::path::format::generic_format};
std::string value;
std::chrono::milliseconds entity_tracking_time_window = 3h; /* The default is 3 hours */
if (context->getProperty(EntityTrackingTimeWindow, value)) {
if (auto parsed_entity_time_window = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value)) {
entity_tracking_time_window = parsed_entity_time_window.value();
} else {
logger_->log_error("Entity Tracking Time Window attribute is invalid");
}
}
/* Check whether we need to invalidate the cache based on the new properties */
if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
(!last_username_.empty() && last_username_ != common_properties.username) ||
(!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
invalidateCache();
}
last_hostname_ = common_properties.hostname;
last_username_ = common_properties.username;
last_remote_path_ = remote_path;
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password);
if (client == nullptr) {
context->yield();
return;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
std::deque<Child> directories;
std::vector<Child> files;
/* Add initial directory */
Child root;
root.parent_path = remote_path.parent_path();
root.filename = remote_path.filename();
root.directory = true;
directories.emplace_back(std::move(root));
/* Process directories */
while (!directories.empty()) {
auto directory = std::move(directories.front());
directories.pop_front();
std::string new_parent_path;
if (directory.parent_path.empty()) {
new_parent_path = directory.filename.generic_string();
} else {
new_parent_path = directory.getPath();
}
std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
continue;
}
for (auto&& dir_child : dir_children) {
if (filter(new_parent_path, dir_child)) {
Child child(new_parent_path, std::move(dir_child));
if (child.directory) {
directories.emplace_back(std::move(child));
} else {
files.emplace_back(std::move(child));
}
}
}
}
/* Process the files with the appropriate tracking strategy */
if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path.generic_string(), std::move(files));
} else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
listByTrackingEntities(context, session, common_properties.hostname, common_properties.port,
common_properties.username, remote_path.generic_string(), entity_tracking_time_window, std::move(files));
} else {
logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
context->yield();
return;
}
put_connection_back_to_cache();
}