in extensions/sftp/processors/ListSFTP.cpp [442:602]
void ListSFTP::listByTrackingTimestamps(
const std::shared_ptr<core::ProcessContext>& context,
const std::shared_ptr<core::ProcessSession>& session,
const std::string& hostname,
uint16_t port,
const std::string& username,
const std::string& remote_path,
std::vector<Child>&& files) {
auto min_timestamp_to_list = last_listed_latest_entry_timestamp_;
/* Load state from cache file if needed */
if (!already_loaded_from_cache_) {
if (updateFromTrackingTimestampsCache(context, hostname, username, remote_path)) {
logger_->log_debug("Successfully loaded state");
} else {
logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
std::chrono::steady_clock::time_point current_run_time = std::chrono::steady_clock::now();
auto now = std::chrono::system_clock::now();
/* Order children by timestamp and try to detect timestamp precision if needed */
std::map<std::chrono::system_clock::time_point, std::list<Child>> ordered_files;
bool target_system_has_seconds = false;
for (auto&& file : files) {
std::chrono::system_clock::time_point timestamp{std::chrono::seconds(file.attrs.mtime)};
target_system_has_seconds |= std::chrono::round<std::chrono::minutes>(timestamp) != timestamp;
bool new_file = !min_timestamp_to_list.has_value() || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
if (new_file) {
auto& files_for_timestamp = ordered_files[timestamp];
files_for_timestamp.emplace_back(std::move(file));
} else {
logger_->log_trace("Skipping \"%s\", because it is not new.", file.getPath().c_str());
}
}
std::optional<std::chrono::system_clock::time_point> latest_listed_entry_timestamp_this_cycle;
size_t flow_files_created = 0U;
if (!ordered_files.empty()) {
latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;
std::string remote_system_timestamp_precision;
if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
if (target_system_has_seconds) {
logger_->log_debug("Precision auto detection detected second precision");
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
} else {
logger_->log_debug("Precision auto detection detected minute precision");
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
}
} else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
} else {
/*
* We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
* so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
*/
remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
}
std::chrono::milliseconds listing_lag{utils::at(LISTING_LAG_MAP, remote_system_timestamp_precision)};
logger_->log_debug("The listing lag is %lu ms", listing_lag.count());
/* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_ && latest_listed_entry_timestamp_this_cycle) {
const auto& latest_files = ordered_files.at(*latest_listed_entry_timestamp_this_cycle);
auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_);
/* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
if (elapsed_time < listing_lag) {
logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
"and the listing lag has not yet elapsed (%lu ms < % lu ms). Yielding.",
toUnixTime(latest_listed_entry_timestamp_this_cycle),
elapsed_time.count(),
listing_lag.count());
context->yield();
return;
}
/*
* If we have already processed the entities with the newest timestamp,
* and there are no new entities with that timestamp, there is nothing to do.
*/
if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
return latest_identifiers_processed_.count(child.getPath()) == 1U;
})) {
logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
"and all files for that timestamp has been processed. Yielding.", toUnixTime(latest_listed_entry_timestamp_this_cycle));
context->yield();
return;
}
} else {
/* Determine the minimum reliable timestamp based on precision */
auto minimum_reliable_timestamp = now - listing_lag;
if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
std::chrono::floor<std::chrono::seconds>(minimum_reliable_timestamp);
} else {
std::chrono::floor<std::chrono::minutes>(minimum_reliable_timestamp);
}
/* If the latest timestamp is not old enough, we wait another cycle */
if (latest_listed_entry_timestamp_this_cycle && minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: %lu ms >= %lu ms",
toUnixTime(latest_listed_entry_timestamp_this_cycle),
toUnixTime(minimum_reliable_timestamp));
ordered_files.erase(*latest_listed_entry_timestamp_this_cycle);
}
}
for (auto& files_for_timestamp : ordered_files) {
if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
/* Filter out previously processed entities. */
for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
if (latest_identifiers_processed_.contains(it->getPath())) {
it = files_for_timestamp.second.erase(it);
} else {
++it;
}
}
}
for (const auto& file : files_for_timestamp.second) {
/* Create the FlowFile for this path */
if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
flow_files_created++;
} else {
logger_->log_error("Failed to emit FlowFile for \"%s\"", file.filename.generic_string());
context->yield();
return;
}
}
}
}
/* If we have a listing timestamp, it is worth persisting the state */
if (latest_listed_entry_timestamp_this_cycle) {
bool processed_new_files = flow_files_created > 0U;
if (processed_new_files) {
auto last_files_it = ordered_files.crbegin();
if (last_files_it->first != last_processed_latest_entry_timestamp_) {
latest_identifiers_processed_.clear();
}
for (const auto& last_file : last_files_it->second) {
latest_identifiers_processed_.insert(last_file.getPath());
}
last_processed_latest_entry_timestamp_ = last_files_it->first;
}
last_run_time_ = current_run_time;
if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
persistTrackingTimestampsCache(context, hostname, username, remote_path);
}
} else {
logger_->log_debug("There are no files to list. Yielding.");
context->yield();
return;
}
}