void ListSFTP::listByTrackingEntities()

in extensions/sftp/processors/ListSFTP.cpp [699:804]


void ListSFTP::listByTrackingEntities(
    const std::shared_ptr<core::ProcessContext>& context,
    const std::shared_ptr<core::ProcessSession>& session,
    const std::string& hostname,
    uint16_t port,
    const std::string& username,
    const std::string& remote_path,
    std::chrono::milliseconds entity_tracking_time_window,
    std::vector<Child>&& files) {
  /* Load state from cache file if needed */
  if (!already_loaded_from_cache_) {
    if (updateFromTrackingEntitiesCache(context, hostname, username, remote_path)) {
      logger_->log_debug("Successfully loaded state");
      initial_listing_complete_ = true;
    } else {
      logger_->log_debug("Failed to load state");
    }
    already_loaded_from_cache_ = true;
  }

  time_t now = time(nullptr);
  uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
      ? 0U : (now * 1000 - entity_tracking_time_window.count());

  /* Skip files not in the tracking window */
  for (auto it = files.begin(); it != files.end(); ) {
    if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
      logger_->log_trace("Skipping \"%s\" because it has an older timestamp than the minimum timestamp to list: %lu < %lu",
          it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
      it = files.erase(it);
    } else {
      ++it;
    }
  }

  if (files.empty()) {
    logger_->log_debug("No entities to list within the tracking time window");
    context->yield();
    return;
  }

  /* Find files that have been updated */
  std::vector<Child> updated_entities;
  std::copy_if(std::make_move_iterator(files.begin()),
               std::make_move_iterator(files.end()),
               std::back_inserter(updated_entities),
               [&](const Child& child) {
     auto already_listed_it = already_listed_entities_.find(child.getPath());
     if (already_listed_it == already_listed_entities_.end()) {
       logger_->log_trace("Found new file \"%s\"", child.getPath());
       return true;
     }

     if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
       logger_->log_trace("Found file \"%s\" with newer timestamp: %lu -> %lu",
           child.getPath(),
           already_listed_it->second.timestamp,
           child.attrs.mtime * 1000);
       return true;
     }

     if (child.attrs.filesize != already_listed_it->second.size) {
       logger_->log_trace("Found file \"%s\" with different size: %lu -> %lu",
                          child.getPath(),
                          already_listed_it->second.size,
                          child.attrs.filesize);
       return true;
     }

     logger_->log_trace("Skipping file \"%s\" because it has not changed", child.getPath());
     return false;
  });

  /* Find entities in the tracking cache that are no longer in the tracking window */
  std::vector<std::string> old_entity_ids;
  for (const auto& already_listed_entity : already_listed_entities_) {
    if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
      old_entity_ids.emplace_back(already_listed_entity.first);
    }
  }

  /* If we have no new files and no expired tracked entities, we have nothing to do */
  if (updated_entities.empty() && old_entity_ids.empty()) {
    context->yield();
    return;
  }

  /* Remove expired entities */
  for (const auto& old_entity_id : old_entity_ids) {
    already_listed_entities_.erase(old_entity_id);
  }

  for (const auto& updated_entity : updated_entities) {
    /* Create the FlowFile for this path */
    if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
      logger_->log_error("Failed to emit FlowFile for \"%s\"", updated_entity.getPath());
      context->yield();
      return;
    }
    already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
  }

  initial_listing_complete_ = true;

  persistTrackingEntitiesCache(context, hostname, username, remote_path);
}