in extensions/sftp/processors/ListSFTP.cpp [676:781]
void ListSFTP::listByTrackingEntities(
core::ProcessContext& context,
core::ProcessSession& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::chrono::milliseconds entity_tracking_time_window,
std::vector<Child>&& files) {
/* Load state from cache file if needed */
if (!already_loaded_from_cache_) {
if (updateFromTrackingEntitiesCache(context, hostname, username, remote_path)) {
logger_->log_debug("Successfully loaded state");
initial_listing_complete_ = true;
} else {
logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
time_t now = time(nullptr);
uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
? 0U : (now * 1000 - entity_tracking_time_window.count());
/* Skip files not in the tracking window */
for (auto it = files.begin(); it != files.end(); ) {
if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
logger_->log_trace("Skipping \"{}\" because it has an older timestamp than the minimum timestamp to list: {} < {}",
it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
it = files.erase(it);
} else {
++it;
}
}
if (files.empty()) {
logger_->log_debug("No entities to list within the tracking time window");
context.yield();
return;
}
/* Find files that have been updated */
std::vector<Child> updated_entities;
std::copy_if(std::make_move_iterator(files.begin()),
std::make_move_iterator(files.end()),
std::back_inserter(updated_entities),
[&](const Child& child) {
auto already_listed_it = already_listed_entities_.find(child.getPath());
if (already_listed_it == already_listed_entities_.end()) {
logger_->log_trace("Found new file \"{}\"", child.getPath());
return true;
}
if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
logger_->log_trace("Found file \"{}\" with newer timestamp: {} -> {}",
child.getPath(),
already_listed_it->second.timestamp,
child.attrs.mtime * 1000);
return true;
}
if (child.attrs.filesize != already_listed_it->second.size) {
logger_->log_trace("Found file \"{}\" with different size: {} -> {}",
child.getPath(),
already_listed_it->second.size,
child.attrs.filesize);
return true;
}
logger_->log_trace("Skipping file \"{}\" because it has not changed", child.getPath());
return false;
});
/* Find entities in the tracking cache that are no longer in the tracking window */
std::vector<std::string> old_entity_ids;
for (const auto& already_listed_entity : already_listed_entities_) {
if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
old_entity_ids.emplace_back(already_listed_entity.first);
}
}
/* If we have no new files and no expired tracked entities, we have nothing to do */
if (updated_entities.empty() && old_entity_ids.empty()) {
context.yield();
return;
}
/* Remove expired entities */
for (const auto& old_entity_id : old_entity_ids) {
already_listed_entities_.erase(old_entity_id);
}
for (const auto& updated_entity : updated_entities) {
/* Create the FlowFile for this path */
if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
logger_->log_error("Failed to emit FlowFile for \"{}\"", updated_entity.getPath());
context.yield();
return;
}
already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
}
initial_listing_complete_ = true;
persistTrackingEntitiesCache(context, hostname, username, remote_path);
}