in extensions/sftp/processors/FetchSFTP.cpp [75:190]
void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
auto flow_file = session->get();
if (flow_file == nullptr) {
return;
}
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
context->yield();
return;
}
std::filesystem::path remote_file;
if (auto remote_file_str = context->getProperty(RemoteFile, flow_file)) {
remote_file = std::filesystem::path(*remote_file_str, std::filesystem::path::format::generic_format);
}
std::filesystem::path move_destination_directory;
if (auto move_destination_directory_str = context->getProperty(MoveDestinationDirectory, flow_file)) {
move_destination_directory = std::filesystem::path(*move_destination_directory_str, std::filesystem::path::format::generic_format);
}
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password);
if (client == nullptr) {
context->yield();
return;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
/* Download file */
try {
session->write(flow_file, [&remote_file, &client](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
auto bytes_read = client->getFile(remote_file.generic_string(), *stream);
if (!bytes_read) {
throw utils::SFTPException{client->getLastError()};
}
return gsl::narrow<int64_t>(*bytes_read);
});
} catch (const utils::SFTPException& ex) {
logger_->log_debug(ex.what());
switch (ex.error()) {
case utils::SFTPError::PermissionDenied:
session->transfer(flow_file, PermissionDenied);
put_connection_back_to_cache();
return;
case utils::SFTPError::FileDoesNotExist:
session->transfer(flow_file, NotFound);
put_connection_back_to_cache();
return;
case utils::SFTPError::CommunicationFailure:
case utils::SFTPError::IoError:
session->transfer(flow_file, CommsFailure);
return;
default:
session->transfer(flow_file, PermissionDenied);
return;
}
}
/* Set attributes */
std::string child_path = remote_file.filename().generic_string();
session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname);
session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port));
session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file.generic_string());
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, child_path);
if (!remote_file.parent_path().empty()) {
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, (remote_file.parent_path() / "").generic_string());
}
/* Execute completion strategy */
if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) {
if (!client->removeFile(remote_file.generic_string())) {
logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"%s\"", remote_file.generic_string());
}
} else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) {
bool should_move = true;
if (create_directory_) {
auto res = createDirectoryHierarchy(*client, move_destination_directory.generic_string(), disable_directory_listing_);
if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) {
should_move = false;
}
}
if (!should_move) {
logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"%s\"", move_destination_directory.generic_string());
} else {
auto target_path = move_destination_directory / child_path;
if (!client->rename(remote_file.generic_string(), target_path.generic_string(), false /*overwrite*/)) {
logger_->log_warn(R"(Completion Strategy is Move File, but failed to move file "%s" to "%s")", remote_file.generic_string(), target_path.generic_string());
}
}
}
session->transfer(flow_file, Success);
put_connection_back_to_cache();
}