void ListSFTP::onTrigger()

in extensions/sftp/processors/ListSFTP.cpp [783:895]


void ListSFTP::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
  /* Parse common properties */
  SFTPProcessorBase::CommonProperties common_properties;
  if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
    context.yield();
    return;
  }

  std::string remote_path_str = context.getProperty(RemotePath).value_or("");
  /* Remove trailing slashes */
  while (remote_path_str.size() > 1 && remote_path_str.ends_with('/')) {
    remote_path_str.pop_back();
  }
  std::filesystem::path remote_path{remote_path_str, std::filesystem::path::format::generic_format};

  std::chrono::milliseconds entity_tracking_time_window = 3h;  /* The default is 3 hours */
  if (const auto entity_tracking_time_window_str = context.getProperty(EntityTrackingTimeWindow)) {
    if (auto parsed_entity_time_window = utils::timeutils::StringToDuration<std::chrono::milliseconds>(*entity_tracking_time_window_str)) {
      entity_tracking_time_window = parsed_entity_time_window.value();
    } else {
      logger_->log_error("Entity Tracking Time Window attribute is invalid");
    }
  }

  /* Check whether we need to invalidate the cache based on the new properties */
  if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
      (!last_username_.empty() && last_username_ != common_properties.username) ||
      (!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
    invalidateCache();
  }
  last_hostname_ = common_properties.hostname;
  last_username_ = common_properties.username;
  last_remote_path_ = remote_path;

  /* Get SFTPClient from cache or create it */
  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
                                                                      common_properties.port,
                                                                      common_properties.username,
                                                                      proxy_type_,
                                                                      common_properties.proxy_host,
                                                                      common_properties.proxy_port,
                                                                      common_properties.proxy_username};
  const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration());
  auto client = getOrCreateConnection(connection_cache_key,
                                      common_properties.password,
                                      common_properties.private_key_path,
                                      common_properties.private_key_passphrase,
                                      common_properties.proxy_password,
                                      buffer_size);
  if (client == nullptr) {
    context.yield();
    return;
  }

  /*
   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
   * So we will only call this when we're sure that the connection is OK.
   */
  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
    addConnectionToCache(connection_cache_key, std::move(client));
  };

  std::deque<Child> directories;
  std::vector<Child> files;

  /* Add initial directory */
  Child root;
  root.parent_path = remote_path.parent_path();
  root.filename = remote_path.filename();
  root.directory = true;
  directories.emplace_back(std::move(root));

  /* Process directories */
  while (!directories.empty()) {
    auto directory = std::move(directories.front());
    directories.pop_front();

    std::string new_parent_path;
    if (directory.parent_path.empty()) {
      new_parent_path = directory.filename.generic_string();
    } else {
      new_parent_path = directory.getPath();
    }
    std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
    if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
      continue;
    }
    for (auto&& dir_child : dir_children) {
      if (filter(new_parent_path, dir_child)) {
        Child child(new_parent_path, std::move(dir_child));
        if (child.directory) {
          directories.emplace_back(std::move(child));
        } else {
          files.emplace_back(std::move(child));
        }
      }
    }
  }

  /* Process the files with the appropriate tracking strategy */
  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
    listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path.generic_string(), std::move(files));
  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
    listByTrackingEntities(context, session, common_properties.hostname, common_properties.port,
        common_properties.username, remote_path.generic_string(), entity_tracking_time_window, std::move(files));
  } else {
    logger_->log_error("Unknown Listing Strategy: \"{}\"", listing_strategy_.c_str());
    context.yield();
    return;
  }

  put_connection_back_to_cache();
}