extensions/sftp/processors/SFTPProcessorBase.cpp (289 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SFTPProcessorBase.h"
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <iterator>
#include <limits>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "ResourceClaim.h"
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "core/logging/Logger.h"
#include "io/BufferStream.h"
#include "utils/ByteArrayCallback.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
SFTPProcessorBase::SFTPProcessorBase(const std::string_view name, const utils::Identifier& uuid)
: ProcessorImpl(name, uuid),
connection_timeout_(0),
data_timeout_(0),
strict_host_checking_(false),
use_keepalive_on_timeout_(false),
use_compression_(false),
running_(true) {
}
SFTPProcessorBase::~SFTPProcessorBase() {
if (keepalive_thread_.joinable()) {
{
std::lock_guard<std::mutex> lock(connections_mutex_);
running_ = false;
keepalive_cv_.notify_one();
}
keepalive_thread_.join();
}
}
void SFTPProcessorBase::notifyStop() {
logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
cleanupConnectionCache();
}
void SFTPProcessorBase::parseCommonPropertiesOnSchedule(core::ProcessContext& context) {
strict_host_checking_ = utils::parseBoolProperty(context, StrictHostKeyChecking);
host_key_file_ = utils::parseOptionalProperty(context, HostKeyFile).value_or("");
connection_timeout_ = utils::parseDurationProperty(context, ConnectionTimeout);
data_timeout_ = utils::parseDurationProperty(context, DataTimeout);
use_keepalive_on_timeout_ = utils::parseBoolProperty(context, SendKeepaliveOnTimeout);
proxy_type_ = utils::parseProperty(context, ProxyType);
}
SFTPProcessorBase::CommonProperties::CommonProperties()
: port(0U)
, proxy_port(0U) {
}
bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(core::ProcessContext& context, const core::FlowFile* const flow_file, CommonProperties& common_properties) {
try {
common_properties.hostname = utils::parseProperty(context, Hostname, flow_file);
common_properties.port = gsl::narrow<uint16_t>(utils::parseU64Property(context, Port, flow_file));
common_properties.username = utils::parseOptionalProperty(context, Username, flow_file).value_or("");
common_properties.private_key_path = utils::parseOptionalProperty(context, PrivateKeyPath, flow_file).value_or("");
common_properties.private_key_passphrase = utils::parseOptionalProperty(context, PrivateKeyPassphrase, flow_file).value_or("");
common_properties.password = utils::parseOptionalProperty(context, Password, flow_file).value_or("");
common_properties.proxy_host = utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or("");
common_properties.proxy_port = gsl::narrow<uint16_t>(utils::parseOptionalU64Property(context, ProxyHost, flow_file).value_or(0));
common_properties.proxy_username = utils::parseOptionalProperty(context, HttpProxyUsername, flow_file).value_or("");
common_properties.proxy_password = utils::parseOptionalProperty(context, HttpProxyPassword, flow_file).value_or("");
} catch (const std::exception& e) {
logger_->log_error("Failed to parse common properties on {}", e.what());
return false;
}
return true;
}
bool SFTPProcessorBase::ConnectionCacheKey::operator<(const SFTPProcessorBase::ConnectionCacheKey& other) const {
return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) <
std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
}
bool SFTPProcessorBase::ConnectionCacheKey::operator==(const SFTPProcessorBase::ConnectionCacheKey& other) const {
return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) ==
std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
}
std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getConnectionFromCache(const SFTPProcessorBase::ConnectionCacheKey& key) {
std::lock_guard<std::mutex> lock(connections_mutex_);
auto it = connections_.find(key);
if (it == connections_.end()) {
return nullptr;
}
logger_->log_debug("Removing {}@{}:{} from SFTP connection pool",
key.username,
key.hostname,
key.port);
auto lru_it = std::find(lru_.begin(), lru_.end(), key);
if (lru_it == lru_.end()) {
logger_->log_trace("Assertion error: can't find key in LRU cache");
} else {
lru_.erase(lru_it);
}
auto connection = std::move(it->second);
connections_.erase(it);
return connection;
}
void SFTPProcessorBase::addConnectionToCache(const SFTPProcessorBase::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
std::lock_guard<std::mutex> lock(connections_mutex_);
while (connections_.size() >= SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE) {
const auto& lru_key = lru_.back();
logger_->log_debug("SFTP connection pool is full, removing {}@{}:{}",
lru_key.username,
lru_key.hostname,
lru_key.port);
connections_.erase(lru_key);
lru_.pop_back();
}
logger_->log_debug("Adding {}@{}:{} to SFTP connection pool",
key.username,
key.hostname,
key.port);
connections_.emplace(key, std::move(connection));
lru_.push_front(key);
keepalive_cv_.notify_one();
}
void SFTPProcessorBase::keepaliveThreadFunc() {
std::unique_lock<std::mutex> lock(connections_mutex_);
while (true) {
if (connections_.empty()) {
keepalive_cv_.wait(lock, [this] {
return !running_ || !connections_.empty();
});
}
if (!running_) {
logger_->log_trace("Stopping keepalive thread");
lock.unlock();
return;
}
int min_wait = 10;
for (auto &connection : connections_) {
int seconds_to_next = 0;
if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
logger_->log_debug("Sent keepalive to {}@{}:{} if needed, next keepalive in {} s",
connection.first.username,
connection.first.hostname,
connection.first.port,
seconds_to_next);
if (seconds_to_next < min_wait) {
min_wait = seconds_to_next;
}
} else {
logger_->log_debug("Failed to send keepalive to {}@{}:{}",
connection.first.username,
connection.first.hostname,
connection.first.port);
}
}
/* Avoid busy loops */
if (min_wait < 1) {
min_wait = 1;
}
logger_->log_trace("Keepalive thread is going to sleep for {} s", min_wait);
keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
return !running_;
});
if (!running_) {
lock.unlock();
return;
}
}
}
void SFTPProcessorBase::startKeepaliveThreadIfNeeded() {
if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
running_ = true;
keepalive_thread_ = std::thread(&SFTPProcessorBase::keepaliveThreadFunc, this);
}
}
void SFTPProcessorBase::cleanupConnectionCache() {
if (keepalive_thread_.joinable()) {
{
std::lock_guard<std::mutex> lock(connections_mutex_);
running_ = false;
keepalive_cv_.notify_one();
}
keepalive_thread_.join();
}
/* The thread is no longer running, we don't have to lock */
connections_.clear();
lru_.clear();
}
std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getOrCreateConnection(
const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
const std::string& password,
const std::string& private_key_path,
const std::string& private_key_passphrase,
const std::string& proxy_password,
const size_t buffer_size) {
auto client = getConnectionFromCache(connection_cache_key);
if (client == nullptr) {
client = std::make_unique<utils::SFTPClient>(connection_cache_key.hostname,
connection_cache_key.port,
connection_cache_key.username,
buffer_size);
if (!IsNullOrEmpty(host_key_file_)) {
if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
logger_->log_error("Cannot set host key file");
return nullptr;
}
}
if (!IsNullOrEmpty(password)) {
client->setPasswordAuthenticationCredentials(password);
}
if (!IsNullOrEmpty(private_key_path)) {
client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
}
if (connection_cache_key.proxy_type != PROXY_TYPE_DIRECT) {
http::HTTPProxy proxy;
proxy.host = connection_cache_key.proxy_host;
proxy.port = connection_cache_key.proxy_port;
proxy.username = connection_cache_key.proxy_username;
proxy.password = proxy_password;
if (!client->setProxy(
connection_cache_key.proxy_type == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
proxy)) {
logger_->log_error("Cannot set proxy");
return nullptr;
}
}
if (!client->setConnectionTimeout(connection_timeout_)) {
logger_->log_error("Cannot set connection timeout");
return nullptr;
}
client->setDataTimeout(data_timeout_);
client->setSendKeepAlive(use_keepalive_on_timeout_);
if (!client->setUseCompression(use_compression_)) {
logger_->log_error("Cannot set compression");
return nullptr;
}
/* Connect to SFTP server */
if (!client->connect()) {
logger_->log_error("Cannot connect to SFTP server");
return nullptr;
}
}
return client;
}
SFTPProcessorBase::CreateDirectoryHierarchyError SFTPProcessorBase::createDirectoryHierarchy(
utils::SFTPClient& client,
const std::string& remote_path,
bool disable_directory_listing) {
bool should_create_directory = disable_directory_listing;
if (!disable_directory_listing) {
LIBSSH2_SFTP_ATTRIBUTES attrs;
if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
if (client.getLastError() != utils::SFTPError::FileDoesNotExist) {
logger_->log_error("Failed to stat {}", remote_path.c_str());
}
should_create_directory = true;
} else {
if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
logger_->log_error("Remote path {} is not a directory", remote_path.c_str());
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
}
logger_->log_debug("Found remote directory {}", remote_path.c_str());
}
}
if (should_create_directory) {
(void) client.createDirectoryHierarchy(remote_path);
if (!disable_directory_listing) {
LIBSSH2_SFTP_ATTRIBUTES attrs;
if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
auto last_error = client.getLastError();
if (last_error == utils::SFTPError::FileDoesNotExist) {
logger_->log_error("Could not find remote directory {} after creating it", remote_path.c_str());
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND;
} else if (last_error == utils::SFTPError::PermissionDenied) {
logger_->log_error("Permission denied when reading remote directory {} after creating it", remote_path.c_str());
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED;
} else {
logger_->log_error("Failed to stat {}", remote_path.c_str());
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED;
}
} else {
if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
logger_->log_error("Remote path {} is not a directory", remote_path.c_str());
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
}
}
}
}
return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK;
}
} // namespace org::apache::nifi::minifi::processors