extensions/http-curl/processors/InvokeHTTP.cpp (287 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "InvokeHTTP.h"
#include <cinttypes>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/OptionalUtils.h"
#include "range/v3/view/filter.hpp"
#include "range/v3/algorithm/any_of.hpp"
namespace org::apache::nifi::minifi::processors {
std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
void InvokeHTTP::initialize() {
logger_->log_trace("Initializing InvokeHTTP");
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
namespace {
void setupClientTimeouts(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
if (auto connection_timeout = context.getProperty<core::TimePeriodValue>(InvokeHTTP::ConnectTimeout))
client.setConnectionTimeout(connection_timeout->getMilliseconds());
if (auto read_timeout = context.getProperty<core::TimePeriodValue>(InvokeHTTP::ReadTimeout))
client.setReadTimeout(read_timeout->getMilliseconds());
}
void setupClientProxy(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
utils::HTTPProxy proxy = {};
context.getProperty(InvokeHTTP::ProxyHost, proxy.host);
std::string port_str;
if (context.getProperty(InvokeHTTP::ProxyPort, port_str) && !port_str.empty()) {
core::Property::StringToInt(port_str, proxy.port);
}
context.getProperty(InvokeHTTP::ProxyUsername, proxy.username);
context.getProperty(InvokeHTTP::ProxyPassword, proxy.password);
client.setHTTPProxy(proxy);
}
void setupClientPeerVerification(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
if (auto disable_peer_verification = context.getProperty(InvokeHTTP::DisablePeerVerification) | utils::flatMap(&utils::StringUtils::toBool)) {
client.setPeerVerification(*disable_peer_verification);
}
}
void setupClientFollowRedirects(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
if (auto follow_redirects = context.getProperty<bool>(InvokeHTTP::FollowRedirects))
client.setFollowRedirects(*follow_redirects);
}
void setupClientContentType(extensions::curl::HTTPClient& client, const core::ProcessContext& context, bool send_body) {
if (auto content_type = context.getProperty(InvokeHTTP::ContentType)) {
if (send_body)
client.setContentType(*content_type);
}
}
void setupClientTransferEncoding(extensions::curl::HTTPClient& client, bool use_chunked_encoding) {
if (use_chunked_encoding)
client.setRequestHeader("Transfer-Encoding", "chunked");
else
client.setRequestHeader("Transfer-Encoding", std::nullopt);
}
} // namespace
void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) {
context.getProperty(SendMessageBody, send_message_body_);
attributes_to_send_ = context.getProperty(AttributesToSend)
| utils::filter([](const std::string& s) { return !s.empty(); }) // avoid compiling an empty string to regex
| utils::map([](const std::string& regex_str) { return utils::Regex{regex_str}; })
| utils::orElse([this] { logger_->log_debug("%s is missing, so the default value will be used", std::string{AttributesToSend.name}); });
always_output_response_ = (context.getProperty(AlwaysOutputResponse) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<invoke_http::InvalidHTTPHeaderFieldHandlingOption>(context, InvalidHTTPHeaderFieldHandlingStrategy);
put_response_body_in_attribute_ = context.getProperty(PutResponseBodyInAttribute);
if (put_response_body_in_attribute_ && put_response_body_in_attribute_->empty()) {
logger_->log_warn("%s is set to an empty string", std::string{PutResponseBodyInAttribute.name});
put_response_body_in_attribute_.reset();
}
use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
send_date_header_ = context.getProperty<bool>(DateHeader).value_or(true);
}
std::unique_ptr<minifi::extensions::curl::HTTPClient> InvokeHTTP::createHTTPClientFromPropertiesAndMembers(const core::ProcessContext& context) const {
std::string method;
if (!context.getProperty(Method, method))
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Method property missing or invalid");
std::string url;
if (!context.getProperty(URL, url))
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid");
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service;
if (auto ssl_context_name = context.getProperty(SSLContext)) {
if (auto service = context.getControllerService(*ssl_context_name)) {
ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);
if (!ssl_context_service)
logger_->log_error("Controller service '%s' is not an SSLContextService", *ssl_context_name);
} else {
logger_->log_error("Couldn't find controller service with name '%s'", *ssl_context_name);
}
}
auto client = std::make_unique<minifi::extensions::curl::HTTPClient>();
client->initialize(std::move(method), std::move(url), std::move(ssl_context_service));
setupClientTimeouts(*client, context);
setupClientProxy(*client, context);
setupClientFollowRedirects(*client, context);
setupClientPeerVerification(*client, context);
setupClientContentType(*client, context, send_message_body_);
setupClientTransferEncoding(*client, use_chunked_encoding_);
return client;
}
void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
gsl_Expects(context);
setupMembersFromProperties(*context);
std::weak_ptr<core::ProcessContext> weak_context = context;
auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
if (auto context = weak_context.lock())
return createHTTPClientFromPropertiesAndMembers(*context);
else
return nullptr;
};
client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
}
bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) {
auto method = client.getRequestMethod();
return ("POST" == method || "PUT" == method || "PATCH" == method);
}
/**
* Calls append_header with valid HTTP header keys, based on attributes_to_send_
* @param flow_file
* @param append_header Callback to append HTTP header to the request
* @return false when the flow file should be routed to failure, true otherwise
*/
bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header) {
static_assert(std::is_invocable_v<decltype(append_header), std::string, std::string>);
if (!attributes_to_send_) return true;
const auto key_fn = [](const std::pair<std::string, std::string>& pair) { return pair.first; };
const auto original_attributes = flow_file.getAttributes();
// non-const views, because otherwise it doesn't satisfy viewable_range, and transform would fail
ranges::viewable_range auto matching_attributes = original_attributes
| ranges::views::filter([this](const auto& key) { return utils::regexMatch(key, *attributes_to_send_); }, key_fn);
switch (invalid_http_header_field_handling_strategy_) {
case invoke_http::InvalidHTTPHeaderFieldHandlingOption::fail:
if (ranges::any_of(matching_attributes, std::not_fn(&extensions::curl::HTTPClient::isValidHttpHeaderField), key_fn)) return false;
for (const auto& header: matching_attributes) append_header(header.first, header.second);
return true;
case invoke_http::InvalidHTTPHeaderFieldHandlingOption::drop:
for (const auto& header: matching_attributes | ranges::views::filter(&extensions::curl::HTTPClient::isValidHttpHeaderField, key_fn)) {
append_header(header.first, header.second);
}
return true;
case invoke_http::InvalidHTTPHeaderFieldHandlingOption::transform:
for (const auto& header: matching_attributes) {
append_header(extensions::curl::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(header.first), header.second);
}
return true;
}
return true;
}
void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(session && context && client_queue_);
auto client = client_queue_->getResource();
onTriggerWithClient(context, session, *client);
}
void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session,
minifi::extensions::curl::HTTPClient& client) {
auto flow_file = session->get();
if (flow_file == nullptr) {
if (!shouldEmitFlowFile(client)) {
logger_->log_debug("InvokeHTTP -- create flow file with %s", client.getRequestMethod());
flow_file = session->create();
} else {
logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", client.getRequestMethod());
yield();
return;
}
} else {
logger_->log_debug("InvokeHTTP -- Received flowfile");
}
logger_->log_debug("onTrigger InvokeHTTP with %s to %s", client.getRequestMethod(), client.getURL());
const auto remove_callback_from_client_at_exit = gsl::finally([&client] {
client.setUploadCallback({});
});
std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
if (shouldEmitFlowFile(client)) {
logger_->log_trace("InvokeHTTP -- reading flowfile");
const auto flow_file_reader_stream = session->getFlowFileContentStream(flow_file);
if (flow_file_reader_stream) {
std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
if (send_message_body_) {
callback_obj = std::make_unique<utils::HTTPUploadStreamContentsCallback>(flow_file_reader_stream);
} else {
callback_obj = std::make_unique<utils::HTTPUploadByteArrayInputCallback>();
}
client.setUploadCallback(std::move(callback_obj));
logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", flow_file->getSize());
if (!send_message_body_) {
client.setRequestHeader("Content-Length", "0");
} else if (!use_chunked_encoding_) {
client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize()));
client.setPostSize(flow_file->getSize());
}
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
} else {
logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
}
if (send_date_header_) {
auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time));
} else {
client.setRequestHeader("Date", std::nullopt);
}
const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); };
if (!appendHeaders(*flow_file, append_header)) {
session->transfer(flow_file, RelFailure);
return;
}
logger_->log_trace("InvokeHTTP -- curl performed");
if (client.submit()) {
logger_->log_trace("InvokeHTTP -- curl successful");
const std::vector<char>& response_body = client.getResponseBody();
const std::vector<std::string>& response_headers = client.getResponseHeaders();
int64_t http_code = client.getResponseCode();
const char* content_type = client.getContentType();
flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
flow_file->addAttribute(REQUEST_URL, client.getURL());
flow_file->addAttribute(TRANSACTION_ID, transaction_id);
bool is_success = ((http_code / 100) == 2);
logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
std::shared_ptr<core::FlowFile> response_flow = nullptr;
if (is_success) {
if (!put_response_body_in_attribute_) {
if (flow_file != nullptr) {
response_flow = session->create(flow_file);
} else {
response_flow = session->create();
}
// if content type isn't returned we should return application/octet-stream
// as per RFC 2046 -- 4.5.1
response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
response_flow->addAttribute(REQUEST_URL, client.getURL());
response_flow->addAttribute(TRANSACTION_ID, transaction_id);
io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
// need an import from the data stream.
session->importFrom(stream, response_flow);
} else {
if (!response_body.empty()) {
std::string body_attribute_str{response_body.data(), response_body.size()};
flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str);
}
}
}
route(flow_file, response_flow, session, context, is_success, http_code);
} else {
session->penalize(flow_file);
session->transfer(flow_file, RelFailure);
}
}
void InvokeHTTP::route(const std::shared_ptr<core::FlowFile>& request, const std::shared_ptr<core::FlowFile>& response, const std::shared_ptr<core::ProcessSession>& session,
const std::shared_ptr<core::ProcessContext>& context, bool is_success, int64_t status_code) {
// check if we should yield the processor
if (!is_success && request == nullptr) {
context->yield();
}
// If the property to output the response flowfile regardless of status code is set then transfer it
bool response_sent = false;
if (always_output_response_ && response != nullptr) {
logger_->log_debug("Outputting success and response");
session->transfer(response, RelResponse);
response_sent = true;
}
// transfer to the correct relationship
// 2xx -> SUCCESS
if (is_success) {
// we have two flowfiles to transfer
if (request != nullptr) {
session->transfer(request, Success);
}
if (response != nullptr && !response_sent) {
logger_->log_debug("Outputting success and response");
session->transfer(response, RelResponse);
}
// 5xx -> RETRY
} else if (status_code / 100 == 5) {
if (request != nullptr) {
session->penalize(request);
session->transfer(request, RelRetry);
}
// 1xx, 3xx, 4xx -> NO RETRY
} else {
if (request != nullptr) {
if (penalize_no_retry_) {
session->penalize(request);
}
session->transfer(request, RelNoRetry);
}
}
}
REGISTER_RESOURCE(InvokeHTTP, Processor);
} // namespace org::apache::nifi::minifi::processors