void ListSFTP::listByTrackingTimestamps()

in extensions/sftp/processors/ListSFTP.cpp [421:579]


void ListSFTP::listByTrackingTimestamps(
    core::ProcessContext& context,
    core::ProcessSession& session,
    const std::string& hostname,
    uint16_t port,
    const std::string& username,
    const std::string& remote_path,
    std::vector<Child>&& files) {
  auto min_timestamp_to_list = last_listed_latest_entry_timestamp_;

  /* Load state from cache file if needed */
  if (!already_loaded_from_cache_) {
    if (updateFromTrackingTimestampsCache(context, hostname, username, remote_path)) {
      logger_->log_debug("Successfully loaded state");
    } else {
      logger_->log_debug("Failed to load state");
    }
    already_loaded_from_cache_ = true;
  }

  std::chrono::steady_clock::time_point current_run_time = std::chrono::steady_clock::now();
  auto now = std::chrono::system_clock::now();

  /* Order children by timestamp and try to detect timestamp precision if needed  */
  std::map<std::chrono::system_clock::time_point, std::list<Child>> ordered_files;
  bool target_system_has_seconds = false;
  for (auto&& file : files) {
    std::chrono::system_clock::time_point timestamp{std::chrono::seconds(file.attrs.mtime)};
    target_system_has_seconds |= std::chrono::round<std::chrono::minutes>(timestamp) != timestamp;

    bool new_file = !min_timestamp_to_list.has_value() || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
    if (new_file) {
      auto& files_for_timestamp = ordered_files[timestamp];
      files_for_timestamp.emplace_back(std::move(file));
    } else {
      logger_->log_trace("Skipping \"{}\", because it is not new.", file.getPath().c_str());
    }
  }

  std::optional<std::chrono::system_clock::time_point> latest_listed_entry_timestamp_this_cycle;
  size_t flow_files_created = 0U;
  if (!ordered_files.empty()) {
    latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;

    std::string remote_system_timestamp_precision;
    if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
      if (target_system_has_seconds) {
        logger_->log_debug("Precision auto detection detected second precision");
        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
      } else {
        logger_->log_debug("Precision auto detection detected minute precision");
        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
      }
    } else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
    } else {
      /*
       * We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
       * so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
       */
      remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
    }
    std::chrono::milliseconds listing_lag{utils::at(LISTING_LAG_MAP, remote_system_timestamp_precision)};
    logger_->log_debug("The listing lag is {}", listing_lag);

    /* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
    if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_ && latest_listed_entry_timestamp_this_cycle) {
      const auto& latest_files = ordered_files.at(*latest_listed_entry_timestamp_this_cycle);
      auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_);
      /* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
      if (elapsed_time < listing_lag) {
        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp ({}) "
                           "and the listing lag has not yet elapsed ({} < {}). Yielding.",
                           latest_listed_entry_timestamp_this_cycle, elapsed_time, listing_lag);
        context.yield();
        return;
      }
      /*
       * If we have already processed the entities with the newest timestamp,
       * and there are no new entities with that timestamp, there is nothing to do.
       */
      if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
          std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
            return latest_identifiers_processed_.count(child.getPath()) == 1U;
          })) {
        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp ({}) "
                           "and all files for that timestamp has been processed. Yielding.", latest_listed_entry_timestamp_this_cycle);
        context.yield();
        return;
      }
    } else {
      /* Determine the minimum reliable timestamp based on precision */
      auto minimum_reliable_timestamp = now - listing_lag;
      if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
        minimum_reliable_timestamp = std::chrono::floor<std::chrono::seconds>(minimum_reliable_timestamp);
      } else {
        minimum_reliable_timestamp = std::chrono::floor<std::chrono::minutes>(minimum_reliable_timestamp);
      }
      /* If the latest timestamp is not old enough, we wait another cycle */
      if (latest_listed_entry_timestamp_this_cycle && minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
        logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: {} >= {}",
                           latest_listed_entry_timestamp_this_cycle,
                           minimum_reliable_timestamp);
        ordered_files.erase(*latest_listed_entry_timestamp_this_cycle);
      }
    }

    for (auto& files_for_timestamp : ordered_files) {
      if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
        /* Filter out previously processed entities. */
        for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
          if (latest_identifiers_processed_.contains(it->getPath())) {
            it = files_for_timestamp.second.erase(it);
          } else {
            ++it;
          }
        }
      }
      for (const auto& file : files_for_timestamp.second) {
        /* Create the FlowFile for this path */
        if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
          flow_files_created++;
        } else {
          logger_->log_error("Failed to emit FlowFile for \"{}\"", file.filename.generic_string());
          context.yield();
          return;
        }
      }
    }
  }

  /* If we have a listing timestamp, it is worth persisting the state */
  if (latest_listed_entry_timestamp_this_cycle) {
    bool processed_new_files = flow_files_created > 0U;
    if (processed_new_files) {
      auto last_files_it = ordered_files.crbegin();
      if (last_files_it->first != last_processed_latest_entry_timestamp_) {
        latest_identifiers_processed_.clear();
      }

      for (const auto& last_file : last_files_it->second) {
        latest_identifiers_processed_.insert(last_file.getPath());
      }

      last_processed_latest_entry_timestamp_ = last_files_it->first;
    }

    last_run_time_ = current_run_time;

    if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
      last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
      persistTrackingTimestampsCache(context, hostname, username, remote_path);
    }
  } else {
    logger_->log_debug("There are no files to list. Yielding.");
    context.yield();
    return;
  }
}