bool PutSFTP::processOne()

in extensions/sftp/processors/PutSFTP.cpp [86:346]


bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  auto flow_file = session->get();
  if (flow_file == nullptr) {
    return false;
  }

  /* Parse common properties */
  SFTPProcessorBase::CommonProperties common_properties;
  if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
    context->yield();
    return false;
  }

  /* Parse processor-specific properties */
  std::filesystem::path filename;
  std::filesystem::path remote_path;
  bool disable_directory_listing = false;
  std::string temp_file_name;
  std::optional<std::chrono::system_clock::time_point> last_modified_;
  bool permissions_set = false;
  uint32_t permissions = 0U;
  bool remote_owner_set = false;
  uint64_t remote_owner = 0U;
  bool remote_group_set = false;
  uint64_t remote_group = 0U;

  if (auto file_name_str = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME))
    filename = *file_name_str;

  std::string value;
  if (auto remote_path_str = context->getProperty(RemotePath, flow_file)) {
    remote_path = std::filesystem::path(*remote_path_str, std::filesystem::path::format::generic_format).lexically_normal();
    while (remote_path.filename().empty() && !remote_path.empty())
      remote_path = remote_path.parent_path();
    if (remote_path.empty())
      remote_path = ".";
  }

  if (context->getDynamicProperty(std::string{DisableDirectoryListing.name}, value) ||
      context->getProperty(DisableDirectoryListing, value)) {
    disable_directory_listing = utils::StringUtils::toBool(value).value_or(false);
  }
  context->getProperty(TempFilename, temp_file_name, flow_file);
  if (context->getProperty(LastModifiedTime, value, flow_file))
    last_modified_ = utils::timeutils::parseDateTimeStr(value);

  if (context->getProperty(Permissions, value, flow_file)) {
    if (core::Property::StringToPermissions(value, permissions)) {
      permissions_set = true;
    }
  }
  if (context->getProperty(RemoteOwner, value, flow_file)) {
    if (core::Property::StringToInt(value, remote_owner)) {
      remote_owner_set = true;
    }
  }
  if (context->getProperty(RemoteGroup, value, flow_file)) {
    if (core::Property::StringToInt(value, remote_group)) {
      remote_group_set = true;
    }
  }

  /* Reject zero-byte files if needed */
  if (reject_zero_byte_ && flow_file->getSize() == 0U) {
    logger_->log_debug("Rejecting %s because it is zero bytes", filename.generic_string());
    session->transfer(flow_file, Reject);
    return true;
  }

  /* Get SFTPClient from cache or create it */
  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
                                                                      common_properties.port,
                                                                      common_properties.username,
                                                                      proxy_type_,
                                                                      common_properties.proxy_host,
                                                                      common_properties.proxy_port,
                                                                      common_properties.proxy_username};
  auto client = getOrCreateConnection(connection_cache_key,
                                      common_properties.password,
                                      common_properties.private_key_path,
                                      common_properties.private_key_passphrase,
                                      common_properties.proxy_password);
  if (client == nullptr) {
    context->yield();
    return false;
  }

  /*
   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
   * So we will only call this when we're sure that the connection is OK.
   */
  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
    addConnectionToCache(connection_cache_key, std::move(client));
  };

  /* Try to detect conflicts if needed */
  std::string resolved_filename = filename.generic_string();
  if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
    auto target_path = (remote_path / filename).generic_string();
    LIBSSH2_SFTP_ATTRIBUTES attrs;
    if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
      if (client->getLastError() != utils::SFTPError::FileDoesNotExist) {
        logger_->log_error("Failed to stat %s", target_path.c_str());
        session->transfer(flow_file, Failure);
        return true;
      }
    } else {
      if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
        logger_->log_error("Rejecting %s because a directory with the same name already exists", filename.c_str());
        session->transfer(flow_file, Reject);
        put_connection_back_to_cache();
        return true;
      }
      logger_->log_debug("Found file with the same name as the target file: %s", filename.c_str());
      if (conflict_resolution_ == CONFLICT_RESOLUTION_IGNORE) {
        logger_->log_debug("Routing %s to SUCCESS despite a file with the same name already existing", filename.c_str());
        session->transfer(flow_file, Success);
        put_connection_back_to_cache();
        return true;
      } else if (conflict_resolution_ == CONFLICT_RESOLUTION_REJECT) {
        logger_->log_debug("Routing %s to REJECT because a file with the same name already exists", filename.c_str());
        session->transfer(flow_file, Reject);
        put_connection_back_to_cache();
        return true;
      } else if (conflict_resolution_ == CONFLICT_RESOLUTION_FAIL) {
        logger_->log_debug("Routing %s to FAILURE because a file with the same name already exists", filename.c_str());
        session->transfer(flow_file, Failure);
        put_connection_back_to_cache();
        return true;
      } else if (conflict_resolution_ == CONFLICT_RESOLUTION_RENAME) {
        std::string possible_resolved_filename;
        bool unique_name_generated = false;
        for (int i = 1; i < 100; i++) {
          possible_resolved_filename = std::to_string(i) + "." + filename.generic_string();
          auto possible_resolved_path = (remote_path / possible_resolved_filename).generic_string();
          if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
            if (client->getLastError() == utils::SFTPError::FileDoesNotExist) {
              unique_name_generated = true;
              break;
            } else {
              logger_->log_error("Failed to stat %s", possible_resolved_path.c_str());
              session->transfer(flow_file, Failure);
              return true;
            }
          }
        }
        if (unique_name_generated) {
          logger_->log_debug("Resolved %s to %s", filename.generic_string(), possible_resolved_filename);
          resolved_filename = std::move(possible_resolved_filename);
        } else {
          logger_->log_error("Rejecting %s because a unique name could not be determined after 99 attempts", filename.c_str());
          session->transfer(flow_file, Reject);
          put_connection_back_to_cache();
          return true;
        }
      }
    }
  }

  /* Create remote directory if needed */
  if (create_directory_) {
    auto res = createDirectoryHierarchy(*client, remote_path.generic_string(), disable_directory_listing);
    switch (res) {
      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
        break;
      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
        context->yield();
        return false;
      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
        session->transfer(flow_file, Failure);
        put_connection_back_to_cache();
        return true;
      default:
        logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
        context->yield();
        return false;
    }
  }

  /* Upload file */
  auto target_path = remote_path;
  if (!IsNullOrEmpty(temp_file_name)) {
    target_path /= temp_file_name;
  } else if (dot_rename_) {
    target_path /= "." + resolved_filename;
  } else {
    target_path /= resolved_filename;
  }

  std::string final_target_path = (remote_path / resolved_filename).generic_string();
  logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());

  try {
    session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::InputStream>& stream) {
      if (!client->putFile(target_path.generic_string(),
          *stream,
          conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
          gsl::narrow<int64_t>(stream->size()) /*expected_size*/)) {
        throw utils::SFTPException{client->getLastError()};
      }
      return gsl::narrow<int64_t>(stream->size());
    });
  } catch (const utils::SFTPException& ex) {
    logger_->log_debug(ex.what());
    session->transfer(flow_file, Failure);
    return true;
  }

  /* Move file to its final place */
  if (target_path != final_target_path) {
    if (!client->rename(target_path.generic_string(), final_target_path, conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/)) {
      logger_->log_error("Failed to move temporary file %s to final path %s", target_path.generic_string(), final_target_path);
      if (!client->removeFile(target_path.generic_string())) {
        logger_->log_error("Failed to remove temporary file %s", target_path.generic_string());
      }
      session->transfer(flow_file, Failure);
      return true;
    }
  }

  /* Set file attributes if needed */
  if (last_modified_ ||
      permissions_set ||
      remote_owner_set ||
      remote_group_set) {
    utils::SFTPClient::SFTPAttributes attrs{};
    attrs.flags = 0U;
    if (last_modified_) {
      /*
       * NiFi doesn't set atime, only mtime, but because they can only be set together,
       * if we don't want to modify atime, we first have to get it.
       * Therefore setting them both saves an extra protocol round.
       */
      attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_MTIME | utils::SFTPClient::SFTP_ATTRIBUTE_ATIME;
      attrs.mtime = std::chrono::duration_cast<std::chrono::seconds>(last_modified_->time_since_epoch()).count();
      attrs.atime = std::chrono::duration_cast<std::chrono::seconds>(last_modified_->time_since_epoch()).count();
    }
    if (permissions_set) {
      attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_PERMISSIONS;
      attrs.permissions = permissions;
    }
    if (remote_owner_set) {
      attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_UID;
      attrs.uid = remote_owner;
    }
    if (remote_group_set) {
      attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_GID;
      attrs.gid = remote_group;
    }
    if (!client->setAttributes(final_target_path, attrs)) {
      /* This is not fatal, just log a warning */
      logger_->log_warn("Failed to set file attributes for %s", target_path.generic_string());
    }
  }

  session->transfer(flow_file, Success);
  put_connection_back_to_cache();
  return true;
}