extensions/sftp/processors/FetchSFTP.cpp (134 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FetchSFTP.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "utils/ConfigurationUtils.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
void FetchSFTP::initialize() {
logger_->log_trace("Initializing FetchSFTP");
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
FetchSFTP::FetchSFTP(std::string_view name, const utils::Identifier& uuid /*= utils::Identifier()*/)
: SFTPProcessorBase(name, uuid) {
logger_ = core::logging::LoggerFactory<FetchSFTP>::getLogger(uuid_);
}
FetchSFTP::~FetchSFTP() = default;
void FetchSFTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
parseCommonPropertiesOnSchedule(context);
completion_strategy_ = utils::parseProperty(context, CompletionStrategy);
create_directory_ = utils::parseBoolProperty(context, CreateDirectory);
disable_directory_listing_ = utils::parseBoolProperty(context, DisableDirectoryListing);
use_compression_ = utils::parseBoolProperty(context, UseCompression);
startKeepaliveThreadIfNeeded();
}
void FetchSFTP::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
auto flow_file = session.get();
if (flow_file == nullptr) {
return;
}
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, flow_file.get(), common_properties)) {
context.yield();
return;
}
std::filesystem::path remote_file;
if (auto remote_file_str = context.getProperty(RemoteFile, flow_file.get())) {
remote_file = std::filesystem::path(*remote_file_str, std::filesystem::path::format::generic_format);
}
std::filesystem::path move_destination_directory;
if (auto move_destination_directory_str = context.getProperty(MoveDestinationDirectory, flow_file.get())) {
move_destination_directory = std::filesystem::path(*move_destination_directory_str, std::filesystem::path::format::generic_format);
}
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
const auto buffer_size = utils::configuration::getBufferSize(*context.getConfiguration());
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password,
buffer_size);
if (client == nullptr) {
context.yield();
return;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
/* Download file */
try {
session.write(flow_file, [&remote_file, &client](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
auto bytes_read = client->getFile(remote_file.generic_string(), *stream);
if (!bytes_read) {
throw utils::SFTPException{client->getLastError()};
}
return gsl::narrow<int64_t>(*bytes_read);
});
} catch (const utils::SFTPException& ex) {
logger_->log_debug("{}", ex.what());
switch (ex.error()) {
case utils::SFTPError::PermissionDenied:
session.transfer(flow_file, PermissionDenied);
put_connection_back_to_cache();
return;
case utils::SFTPError::FileDoesNotExist:
session.transfer(flow_file, NotFound);
put_connection_back_to_cache();
return;
case utils::SFTPError::CommunicationFailure:
case utils::SFTPError::IoError:
session.transfer(flow_file, CommsFailure);
return;
default:
session.transfer(flow_file, PermissionDenied);
return;
}
}
/* Set attributes */
std::string child_path = remote_file.filename().generic_string();
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname);
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port));
session.putAttribute(*flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file.generic_string());
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, child_path);
if (!remote_file.parent_path().empty()) {
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (remote_file.parent_path() / "").generic_string());
}
/* Execute completion strategy */
if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) {
if (!client->removeFile(remote_file.generic_string())) {
logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"{}\"", remote_file.generic_string());
}
} else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) {
bool should_move = true;
if (create_directory_) {
auto res = createDirectoryHierarchy(*client, move_destination_directory.generic_string(), disable_directory_listing_);
if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) {
should_move = false;
}
}
if (!should_move) {
logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"{}\"", move_destination_directory.generic_string());
} else {
auto target_path = move_destination_directory / child_path;
if (!client->rename(remote_file.generic_string(), target_path.generic_string(), false /*overwrite*/)) {
logger_->log_warn(R"(Completion Strategy is Move File, but failed to move file "{}" to "{}")", remote_file.generic_string(), target_path.generic_string());
}
}
}
session.transfer(flow_file, Success);
put_connection_back_to_cache();
}
REGISTER_RESOURCE(FetchSFTP, Processor);
} // namespace org::apache::nifi::minifi::processors