bool ListSFTP::updateFromTrackingTimestampsCache()

in extensions/sftp/processors/ListSFTP.cpp [340:419]


bool ListSFTP::updateFromTrackingTimestampsCache(core::ProcessContext& /*context*/, const std::string& hostname, const std::string& username, const std::string& remote_path) {
  std::string state_listing_strategy;
  std::string state_hostname;
  std::string state_username;
  std::string state_remote_path;
  uint64_t state_listing_timestamp = 0;
  uint64_t state_processed_timestamp = 0;
  std::set<std::string> state_ids;

  std::unordered_map<std::string, std::string> state_map;
  if (!state_manager_->get(state_map)) {
    logger_->log_info("Found no stored state");
    return false;
  }

  try {
    state_listing_strategy = state_map.at("listing_strategy");
  } catch (...) {
    logger_->log_error("listing_strategy is missing from state");
    return false;
  }
  try {
    state_hostname = state_map.at("hostname");
  } catch (...) {
    logger_->log_error("hostname is missing from state");
    return false;
  }
  try {
    state_username = state_map.at("username");
  } catch (...) {
    logger_->log_error("username is missing from state");
    return false;
  }
  try {
    state_remote_path = state_map.at("remote_path");
  } catch (...) {
    logger_->log_error("remote_path is missing from state");
    return false;
  }
  try {
    state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
  } catch (...) {
    logger_->log_error("listing.timestamp is missing from state or is invalid");
    return false;
  }
  try {
    state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
  } catch (...) {
    logger_->log_error("processed.timestamp is missing from state or is invalid");
    return false;
  }
  for (const auto &kv : state_map) {
    if (kv.first.compare(0, strlen("id."), "id.") == 0) {
      state_ids.emplace(kv.second);
    }
  }

  if (state_listing_strategy != listing_strategy_ ||
      state_hostname != hostname ||
      state_username != username ||
      state_remote_path != remote_path) {
    logger_->log_error(
        "Processor state was persisted with different settings than the current ones, ignoring. "
        "Listing Strategy: \"{}\" vs. \"{}\", "
        "Hostname: \"{}\" vs. \"{}\", "
        "Username: \"{}\" vs. \"{}\", "
        "Remote Path: \"{}\" vs. \"{}\"",
        state_listing_strategy, listing_strategy_,
        state_hostname, hostname,
        state_username, username,
        state_remote_path, remote_path);
    return false;
  }

  last_listed_latest_entry_timestamp_ = fromUnixTime(state_listing_timestamp);
  last_processed_latest_entry_timestamp_ = fromUnixTime(state_processed_timestamp);
  latest_identifiers_processed_ = std::move(state_ids);

  return true;
}