extensions/standard-processors/processors/PutTCP.cpp (160 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutTCP.h"
#include <tuple>
#include <utility>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "core/logging/Logger.h"
#include "range/v3/range/conversion.hpp"
#include "utils/gsl.h"
#include "utils/net/AsioCoro.h"
#include "utils/net/AsioSocketUtils.h"
#include "utils/ProcessorConfigUtils.h"
using namespace std::literals::chrono_literals;
using org::apache::nifi::minifi::utils::net::TcpSocket;
using org::apache::nifi::minifi::utils::net::SslSocket;
namespace org::apache::nifi::minifi::processors {
constexpr size_t chunk_size = 1024;
PutTCP::PutTCP(const std::string_view name, const utils::Identifier& uuid)
: ProcessorImpl(name, uuid) {
logger_ = core::logging::LoggerFactory<PutTCP>::getLogger(uuid_);
}
PutTCP::~PutTCP() = default;
void PutTCP::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void PutTCP::notifyStop() {}
void PutTCP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
// if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files
if (!getProperty(Hostname.name)) {
throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"};
}
if (!getProperty(Port.name)) {
throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"};
}
if (const auto idle_connection_expiration = utils::parseOptionalDurationProperty(context, IdleConnectionExpiration); idle_connection_expiration && *idle_connection_expiration > 0ms)
idle_connection_expiration_ = idle_connection_expiration;
else
idle_connection_expiration_.reset();
if (const auto timeout = utils::parseOptionalDurationProperty(context, Timeout); timeout && *timeout > 0ms)
timeout_duration_ = *timeout;
else
timeout_duration_ = 15s;
if (utils::parseBoolProperty(context, ConnectionPerFlowFile))
connections_.reset();
else
connections_.emplace();
ssl_context_.reset();
if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) {
if (auto controller_service = context.getControllerService(*context_name, getUUID())) {
if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name, getUUID()))) {
ssl_context_ = utils::net::getSslContext(*ssl_context_service);
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service");
}
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name);
}
}
const auto delimiter_str = context.getProperty(OutgoingMessageDelimiter).value_or(std::string{});
delimiter_ = utils::span_to<std::vector>(as_bytes(std::span(delimiter_str)));
max_size_of_socket_send_buffer_ = utils::parseOptionalDataSizeProperty(context, MaxSizeOfSocketSendBuffer);
}
void PutTCP::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
const auto flow_file = session.get();
if (!flow_file) {
yield();
return;
}
removeExpiredConnections();
auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{});
auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{});
if (hostname.empty() || port.empty()) {
logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(),
hostname.empty() ? "(empty)" : hostname.c_str(),
port.empty() ? "(empty)" : port.c_str());
session.transfer(flow_file, Failure);
return;
}
auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port));
std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
if (!connections_ || !connections_->contains(connection_id)) {
if (ssl_context_)
handler = std::make_shared<utils::net::ConnectionHandler<SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
else
handler = std::make_shared<utils::net::ConnectionHandler<TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
if (connections_)
(*connections_)[connection_id] = handler;
} else {
handler = (*connections_)[connection_id];
}
gsl_Expects(handler);
processFlowFile(handler, session, flow_file);
}
void PutTCP::removeExpiredConnections() {
if (connections_) {
std::erase_if(*connections_, [this](auto& item) -> bool {
const auto& connection_handler = item.second;
return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
});
}
}
std::error_code PutTCP::sendFlowFileContent(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
std::error_code operation_error;
io_context_.restart();
asio::co_spawn(io_context_,
sendStreamWithDelimiter(*connection_handler, flow_file_content_stream, delimiter_),
[&operation_error](const std::exception_ptr&, const std::error_code error_code) {
operation_error = error_code;
});
io_context_.run();
return operation_error;
}
asio::awaitable<std::error_code> PutTCP::sendStreamWithDelimiter(utils::net::ConnectionHandlerBase& connection_handler,
const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) {
if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) { // NOLINT (clang tidy doesnt like coroutines)
co_return connection_error;
}
std::vector<std::byte> data_chunk;
data_chunk.resize(chunk_size);
const std::span<std::byte> buffer{data_chunk};
while (stream_to_send->tell() < stream_to_send->size()) {
const size_t num_read = stream_to_send->read(buffer);
if (io::isError(num_read))
co_return std::make_error_code(std::errc::io_error);
auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(data_chunk, num_read));
if (write_error)
co_return write_error;
logger_->log_trace("Writing flowfile({} bytes) to socket succeeded", bytes_written);
}
auto [delimiter_write_error, delimiter_bytes_written] = co_await connection_handler.write(asio::buffer(delimiter));
if (delimiter_write_error)
co_return delimiter_write_error;
logger_->log_trace("Writing delimiter({} bytes) to socket succeeded", delimiter_bytes_written);
co_return std::error_code();
}
void PutTCP::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
core::ProcessSession& session,
const std::shared_ptr<core::FlowFile>& flow_file) {
const auto flow_file_content_stream = session.getFlowFileContentStream(*flow_file);
if (!flow_file_content_stream) {
session.transfer(flow_file, Failure);
return;
}
std::error_code operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
if (operation_error && connection_handler->hasBeenUsed()) {
logger_->log_warn("{} with reused connection, retrying...", operation_error.message());
connection_handler->reset();
operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream);
}
if (operation_error) {
connection_handler->reset();
logger_->log_error("{}", operation_error.message());
session.transfer(flow_file, Failure);
} else {
session.transfer(flow_file, Success);
}
}
REGISTER_RESOURCE(PutTCP, Processor);
} // namespace org::apache::nifi::minifi::processors