extensions/sftp/processors/PutSFTP.cpp (292 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutSFTP.h"
#include <memory>
#include <cstdint>
#include <iostream>
#include <limits>
#include <string>
#include <utility>
#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "io/StreamFactory.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
namespace org::apache::nifi::minifi::processors {
void PutSFTP::initialize() {
logger_->log_trace("Initializing PutSFTP");
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
PutSFTP::PutSFTP(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
: SFTPProcessorBase(std::move(name), uuid),
create_directory_(false),
batch_size_(0),
reject_zero_byte_(false),
dot_rename_(false) {
logger_ = core::logging::LoggerFactory<PutSFTP>::getLogger(uuid_);
}
PutSFTP::~PutSFTP() = default;
void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
parseCommonPropertiesOnSchedule(context);
std::string value;
if (!context->getProperty(CreateDirectory, value)) {
logger_->log_error("Create Directory attribute is missing or invalid");
} else {
create_directory_ = utils::StringUtils::toBool(value).value_or(false);
}
if (!context->getProperty(BatchSize, value)) {
logger_->log_error("Batch Size attribute is missing or invalid");
} else {
core::Property::StringToInt(value, batch_size_);
}
context->getProperty(ConflictResolution, conflict_resolution_);
if (context->getProperty(RejectZeroByte, value)) {
reject_zero_byte_ = utils::StringUtils::toBool(value).value_or(true);
}
if (context->getProperty(DotRename, value)) {
dot_rename_ = utils::StringUtils::toBool(value).value_or(true);
}
if (!context->getProperty(UseCompression, value)) {
logger_->log_error("Use Compression attribute is missing or invalid");
} else {
use_compression_ = utils::StringUtils::toBool(value).value_or(false);
}
startKeepaliveThreadIfNeeded();
}
bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
auto flow_file = session->get();
if (flow_file == nullptr) {
return false;
}
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
context->yield();
return false;
}
/* Parse processor-specific properties */
std::filesystem::path filename;
std::filesystem::path remote_path;
bool disable_directory_listing = false;
std::string temp_file_name;
std::optional<std::chrono::system_clock::time_point> last_modified_;
bool permissions_set = false;
uint32_t permissions = 0U;
bool remote_owner_set = false;
uint64_t remote_owner = 0U;
bool remote_group_set = false;
uint64_t remote_group = 0U;
if (auto file_name_str = flow_file->getAttribute(core::SpecialFlowAttribute::FILENAME))
filename = *file_name_str;
std::string value;
if (auto remote_path_str = context->getProperty(RemotePath, flow_file)) {
remote_path = std::filesystem::path(*remote_path_str, std::filesystem::path::format::generic_format).lexically_normal();
while (remote_path.filename().empty() && !remote_path.empty())
remote_path = remote_path.parent_path();
if (remote_path.empty())
remote_path = ".";
}
if (context->getDynamicProperty(std::string{DisableDirectoryListing.name}, value) ||
context->getProperty(DisableDirectoryListing, value)) {
disable_directory_listing = utils::StringUtils::toBool(value).value_or(false);
}
context->getProperty(TempFilename, temp_file_name, flow_file);
if (context->getProperty(LastModifiedTime, value, flow_file))
last_modified_ = utils::timeutils::parseDateTimeStr(value);
if (context->getProperty(Permissions, value, flow_file)) {
if (core::Property::StringToPermissions(value, permissions)) {
permissions_set = true;
}
}
if (context->getProperty(RemoteOwner, value, flow_file)) {
if (core::Property::StringToInt(value, remote_owner)) {
remote_owner_set = true;
}
}
if (context->getProperty(RemoteGroup, value, flow_file)) {
if (core::Property::StringToInt(value, remote_group)) {
remote_group_set = true;
}
}
/* Reject zero-byte files if needed */
if (reject_zero_byte_ && flow_file->getSize() == 0U) {
logger_->log_debug("Rejecting %s because it is zero bytes", filename.generic_string());
session->transfer(flow_file, Reject);
return true;
}
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password);
if (client == nullptr) {
context->yield();
return false;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
/* Try to detect conflicts if needed */
std::string resolved_filename = filename.generic_string();
if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
auto target_path = (remote_path / filename).generic_string();
LIBSSH2_SFTP_ATTRIBUTES attrs;
if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() != utils::SFTPError::FileDoesNotExist) {
logger_->log_error("Failed to stat %s", target_path.c_str());
session->transfer(flow_file, Failure);
return true;
}
} else {
if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
logger_->log_error("Rejecting %s because a directory with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
}
logger_->log_debug("Found file with the same name as the target file: %s", filename.c_str());
if (conflict_resolution_ == CONFLICT_RESOLUTION_IGNORE) {
logger_->log_debug("Routing %s to SUCCESS despite a file with the same name already existing", filename.c_str());
session->transfer(flow_file, Success);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_REJECT) {
logger_->log_debug("Routing %s to REJECT because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_FAIL) {
logger_->log_debug("Routing %s to FAILURE because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Failure);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_RENAME) {
std::string possible_resolved_filename;
bool unique_name_generated = false;
for (int i = 1; i < 100; i++) {
possible_resolved_filename = std::to_string(i) + "." + filename.generic_string();
auto possible_resolved_path = (remote_path / possible_resolved_filename).generic_string();
if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() == utils::SFTPError::FileDoesNotExist) {
unique_name_generated = true;
break;
} else {
logger_->log_error("Failed to stat %s", possible_resolved_path.c_str());
session->transfer(flow_file, Failure);
return true;
}
}
}
if (unique_name_generated) {
logger_->log_debug("Resolved %s to %s", filename.generic_string(), possible_resolved_filename);
resolved_filename = std::move(possible_resolved_filename);
} else {
logger_->log_error("Rejecting %s because a unique name could not be determined after 99 attempts", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
}
}
}
}
/* Create remote directory if needed */
if (create_directory_) {
auto res = createDirectoryHierarchy(*client, remote_path.generic_string(), disable_directory_listing);
switch (res) {
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
break;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
context->yield();
return false;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
session->transfer(flow_file, Failure);
put_connection_back_to_cache();
return true;
default:
logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
context->yield();
return false;
}
}
/* Upload file */
auto target_path = remote_path;
if (!IsNullOrEmpty(temp_file_name)) {
target_path /= temp_file_name;
} else if (dot_rename_) {
target_path /= "." + resolved_filename;
} else {
target_path /= resolved_filename;
}
std::string final_target_path = (remote_path / resolved_filename).generic_string();
logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
try {
session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::InputStream>& stream) {
if (!client->putFile(target_path.generic_string(),
*stream,
conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
gsl::narrow<int64_t>(stream->size()) /*expected_size*/)) {
throw utils::SFTPException{client->getLastError()};
}
return gsl::narrow<int64_t>(stream->size());
});
} catch (const utils::SFTPException& ex) {
logger_->log_debug(ex.what());
session->transfer(flow_file, Failure);
return true;
}
/* Move file to its final place */
if (target_path != final_target_path) {
if (!client->rename(target_path.generic_string(), final_target_path, conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/)) {
logger_->log_error("Failed to move temporary file %s to final path %s", target_path.generic_string(), final_target_path);
if (!client->removeFile(target_path.generic_string())) {
logger_->log_error("Failed to remove temporary file %s", target_path.generic_string());
}
session->transfer(flow_file, Failure);
return true;
}
}
/* Set file attributes if needed */
if (last_modified_ ||
permissions_set ||
remote_owner_set ||
remote_group_set) {
utils::SFTPClient::SFTPAttributes attrs{};
attrs.flags = 0U;
if (last_modified_) {
/*
* NiFi doesn't set atime, only mtime, but because they can only be set together,
* if we don't want to modify atime, we first have to get it.
* Therefore setting them both saves an extra protocol round.
*/
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_MTIME | utils::SFTPClient::SFTP_ATTRIBUTE_ATIME;
attrs.mtime = std::chrono::duration_cast<std::chrono::seconds>(last_modified_->time_since_epoch()).count();
attrs.atime = std::chrono::duration_cast<std::chrono::seconds>(last_modified_->time_since_epoch()).count();
}
if (permissions_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_PERMISSIONS;
attrs.permissions = permissions;
}
if (remote_owner_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_UID;
attrs.uid = remote_owner;
}
if (remote_group_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_GID;
attrs.gid = remote_group;
}
if (!client->setAttributes(final_target_path, attrs)) {
/* This is not fatal, just log a warning */
logger_->log_warn("Failed to set file attributes for %s", target_path.generic_string());
}
}
session->transfer(flow_file, Success);
put_connection_back_to_cache();
return true;
}
void PutSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
const uint64_t limit = batch_size_ > 0 ? batch_size_ : std::numeric_limits<uint64_t>::max();
for (uint64_t i = 0; i < limit; i++) {
if (!this->processOne(context, session)) {
return;
}
}
}
REGISTER_RESOURCE(PutSFTP, Processor);
} // namespace org::apache::nifi::minifi::processors