extensions/standard-processors/processors/InvokeHTTP.cpp (320 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "InvokeHTTP.h" #include <cinttypes> #include <memory> #include <string> #include <utility> #include <vector> #include "core/FlowFile.h" #include "core/ProcessContext.h" #include "core/Resource.h" #include "io/BufferStream.h" #include "range/v3/algorithm/any_of.hpp" #include "range/v3/view/filter.hpp" #include "utils/OptionalUtils.h" #include "utils/ProcessorConfigUtils.h" #include "utils/gsl.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::processors { namespace invoke_http { HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) { std::unique_lock lock(clients_mutex_); const auto it = std::find_if(std::begin(unused_clients_), std::end(unused_clients_), [&url](const auto& client) { return client->getURL() == url; }); if (it != std::end(unused_clients_)) { used_clients_.splice(used_clients_.end(), unused_clients_, it); return {*this, **it}; } if (used_clients_.size() + unused_clients_.size() < max_size_) { auto client = create_client_function_(url); used_clients_.push_back(std::move(client)); return {*this, *used_clients_.back()}; } else { cv_.wait(lock, [this] { return !unused_clients_.empty(); }); auto client = create_client_function_(url); unused_clients_.front() = std::move(client); used_clients_.splice(used_clients_.end(), unused_clients_, unused_clients_.begin()); return {*this, *used_clients_.back()}; } } void HttpClientStore::returnClient(http::HTTPClient& client) { std::unique_lock lock(clients_mutex_); const auto it = std::find_if(std::begin(used_clients_), std::end(used_clients_), [&client](const auto& elem) { return &client == elem.get(); }); if (it == std::end(used_clients_)) { logger_->log_error("Couldn't find HTTP client in client store to be returned"); return; } unused_clients_.splice(unused_clients_.end(), used_clients_, it); lock.unlock(); cv_.notify_one(); } } // namespace invoke_http namespace { nonstd::expected<std::string_view, std::error_code> removePerSecSuffix(const std::string_view input) { const auto trimmed_input = utils::string::trim(input); if (trimmed_input.ends_with("/s") || trimmed_input.ends_with("/S")) { return trimmed_input.substr(0, trimmed_input.size() - 2); } return nonstd::make_unexpected(core::ParsingErrorCode::GeneralParsingError); } } // namespace nonstd::expected<uint64_t, std::error_code> invoke_http::parseDataTransferSpeed(const std::string_view input) { return removePerSecSuffix(input) | utils::andThen(parsing::parseDataSize); } bool invoke_http::DataTransferSpeedValidator::validate(const std::string_view input) const { return parseDataTransferSpeed(input).has_value(); } std::string InvokeHTTP::DefaultContentType = "application/octet-stream"; void InvokeHTTP::initialize() { logger_->log_trace("Initializing InvokeHTTP"); setSupportedProperties(Properties); setSupportedRelationships(Relationships); } namespace { void setupClientTimeouts(http::HTTPClient& client, std::optional<std::chrono::milliseconds> connection_timeout, std::optional<std::chrono::milliseconds> read_timeout) { if (connection_timeout) client.setConnectionTimeout(*connection_timeout); if (read_timeout) client.setReadTimeout(*read_timeout); } void setupClientTransferEncoding(http::HTTPClient& client, bool use_chunked_encoding) { if (use_chunked_encoding) client.setRequestHeader("Transfer-Encoding", "chunked"); else client.setRequestHeader("Transfer-Encoding", std::nullopt); } } // namespace void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) { if (const auto url = context.getProperty(URL.name); !url || url->empty()) throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or empty"); method_ = utils::parseEnumProperty<http::HttpRequestMethod>(context, Method); send_message_body_ = utils::parseBoolProperty(context, SendMessageBody); attributes_to_send_ = context.getProperty(AttributesToSend) | utils::toOptional() | utils::filter([](const std::string& s) { return !s.empty(); }) // avoid compiling an empty string to regex | utils::transform([](const std::string& regex_str) { return utils::Regex{regex_str}; }) | utils::orElse([this] { logger_->log_debug("{} is missing, so the default value will be used", AttributesToSend.name); }); always_output_response_ = utils::parseOptionalBoolProperty(context, AlwaysOutputResponse).value_or(false); penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::andThen(parsing::parseBool)).value_or(false); invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<invoke_http::InvalidHTTPHeaderFieldHandlingOption>(context, InvalidHTTPHeaderFieldHandlingStrategy); put_response_body_in_attribute_ = context.getProperty(PutResponseBodyInAttribute) | utils::toOptional(); if (put_response_body_in_attribute_ && put_response_body_in_attribute_->empty()) { logger_->log_warn("{} is set to an empty string", PutResponseBodyInAttribute.name); put_response_body_in_attribute_.reset(); } use_chunked_encoding_ = utils::parseBoolProperty(context, UseChunkedEncoding); send_date_header_ = utils::parseOptionalBoolProperty(context, DateHeader).value_or(true); maximum_upload_speed_ = context.getProperty(UploadSpeedLimit) | utils::andThen(invoke_http::parseDataTransferSpeed) | utils::toOptional(); maximum_download_speed_ = context.getProperty(DownloadSpeedLimit) | utils::andThen(invoke_http::parseDataTransferSpeed) | utils::toOptional(); connect_timeout_ = utils::parseDurationProperty(context, ConnectTimeout); // Shouldn't fail due to default value; read_timeout_ = utils::parseDurationProperty(context, ReadTimeout); // Shouldn't fail due to default value; proxy_.host = context.getProperty(InvokeHTTP::ProxyHost).value_or(""); proxy_.port = (context.getProperty(InvokeHTTP::ProxyPort) | utils::andThen(parsing::parseIntegral<int>)).value_or(0); std::string port_str; proxy_.username = context.getProperty(InvokeHTTP::ProxyUsername).value_or(""); proxy_.password = context.getProperty(InvokeHTTP::ProxyPassword).value_or(""); follow_redirects_ = utils::parseBoolProperty(context, FollowRedirects); // Shouldn't fail due to default value; content_type_ = utils::parseProperty(context, InvokeHTTP::ContentType); // Shouldn't fail due to default value; if (auto ssl_context_name = context.getProperty(SSLContext)) { if (auto service = context.getControllerService(*ssl_context_name, getUUID())) { ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service); if (!ssl_context_service_) logger_->log_error("Controller service '{}' is not an SSLContextService", *ssl_context_name); } else { logger_->log_error("Couldn't find controller service with name '{}'", *ssl_context_name); } } } gsl::not_null<std::unique_ptr<http::HTTPClient>> InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const { auto client = std::make_unique<http::HTTPClient>(); client->initialize(method_, url, ssl_context_service_); setupClientTimeouts(*client, connect_timeout_, read_timeout_); client->setHTTPProxy(proxy_); client->setFollowRedirects(follow_redirects_); if (send_message_body_ && content_type_) client->setContentType(*content_type_); setupClientTransferEncoding(*client, use_chunked_encoding_); if (maximum_upload_speed_) { client->setMaximumUploadSpeed(*maximum_upload_speed_); } if (maximum_download_speed_) { client->setMaximumDownloadSpeed(*maximum_download_speed_); } return gsl::make_not_null(std::move(client)); } void InvokeHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { setupMembersFromProperties(context); auto create_client = [this](const std::string& url) -> gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> { return createHTTPClientFromMembers(url); }; client_queue_ = std::make_unique<invoke_http::HttpClientStore>(getMaxConcurrentTasks() * 2, create_client); } bool InvokeHTTP::shouldEmitFlowFile() const { return (http::HttpRequestMethod::POST == method_ || http::HttpRequestMethod::PUT == method_ || http::HttpRequestMethod::PATCH == method_); } /** * Calls append_header with valid HTTP header keys, based on attributes_to_send_ * @param flow_file * @param append_header Callback to append HTTP header to the request * @return false when the flow file should be routed to failure, true otherwise */ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header) { static_assert(std::is_invocable_v<decltype(append_header), std::string, std::string>); if (!attributes_to_send_) return true; const auto key_fn = [](const std::pair<std::string, std::string>& pair) { return pair.first; }; const auto original_attributes = flow_file.getAttributes(); // non-const views, because otherwise it doesn't satisfy viewable_range, and transform would fail ranges::viewable_range auto matching_attributes = original_attributes | ranges::views::filter([this](const auto& key) { return utils::regexMatch(key, *attributes_to_send_); }, key_fn); switch (invalid_http_header_field_handling_strategy_) { case invoke_http::InvalidHTTPHeaderFieldHandlingOption::fail: if (ranges::any_of(matching_attributes, std::not_fn(&http::HTTPClient::isValidHttpHeaderField), key_fn)) return false; for (const auto& header: matching_attributes) append_header(header.first, http::HTTPClient::removeInvalidCharactersFromHttpHeaderFieldBody(header.second)); return true; case invoke_http::InvalidHTTPHeaderFieldHandlingOption::drop: for (const auto& header: matching_attributes | ranges::views::filter(&http::HTTPClient::isValidHttpHeaderField, key_fn)) { append_header(header.first, http::HTTPClient::removeInvalidCharactersFromHttpHeaderFieldBody(header.second)); } return true; case invoke_http::InvalidHTTPHeaderFieldHandlingOption::transform: for (const auto& header: matching_attributes) { append_header(http::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(header.first), http::HTTPClient::removeInvalidCharactersFromHttpHeaderFieldBody(header.second)); } return true; } return true; } void InvokeHTTP::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { gsl_Expects(client_queue_); auto flow_file = session.get(); if (flow_file == nullptr) { if (!shouldEmitFlowFile()) { logger_->log_debug("InvokeHTTP -- create flow file with {}", magic_enum::enum_name(method_)); flow_file = session.create(); } else { logger_->log_debug("Exiting because method is {} and there is no flowfile available to execute it, yielding", magic_enum::enum_name(method_)); yield(); return; } } else { logger_->log_debug("InvokeHTTP -- Received flowfile"); } auto url = context.getProperty(URL, flow_file.get()); if (!url || url->empty()) { logger_->log_error("InvokeHTTP -- URL is empty, transferring to failure"); session.transfer(flow_file, RelFailure); return; } auto client = client_queue_->getClient(*url); onTriggerWithClient(context, session, flow_file, client.get()); } void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file, minifi::http::HTTPClient& client) { logger_->log_debug("onTrigger InvokeHTTP with {} to {}", magic_enum::enum_name(method_), client.getURL()); const auto remove_callback_from_client_at_exit = gsl::finally([&client] { client.setUploadCallback({}); }); std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string(); if (shouldEmitFlowFile()) { logger_->log_trace("InvokeHTTP -- reading flowfile"); const auto flow_file_reader_stream = session.getFlowFileContentStream(*flow_file); if (flow_file_reader_stream) { std::unique_ptr<http::HTTPUploadCallback> callback_obj; if (send_message_body_) { callback_obj = std::make_unique<http::HTTPUploadStreamContentsCallback>(flow_file_reader_stream); } else { callback_obj = std::make_unique<http::HTTPUploadByteArrayInputCallback>(); } client.setUploadCallback(std::move(callback_obj)); logger_->log_trace("InvokeHTTP -- Setting callback, size is {}", flow_file->getSize()); if (!send_message_body_) { client.setRequestHeader("Content-Length", "0"); } else if (!use_chunked_encoding_) { client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize())); client.setPostSize(flow_file->getSize()); } } else { logger_->log_error("InvokeHTTP -- no resource claim"); } } else { logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server"); } if (send_date_header_) { auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now()); client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time)); } else { client.setRequestHeader("Date", std::nullopt); } const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); }; if (!appendHeaders(*flow_file, append_header)) { session.transfer(flow_file, RelFailure); return; } logger_->log_trace("InvokeHTTP -- curl performed"); if (client.submit()) { logger_->log_trace("InvokeHTTP -- curl successful"); const std::vector<char>& response_body = client.getResponseBody(); const std::vector<std::string>& response_headers = client.getResponseHeaders(); int64_t http_code = client.getResponseCode(); const char* content_type = client.getContentType(); flow_file->addAttribute(STATUS_CODE, std::to_string(http_code)); if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); } flow_file->addAttribute(REQUEST_URL, client.getURL()); flow_file->addAttribute(TRANSACTION_ID, transaction_id); bool is_success = ((http_code / 100) == 2); logger_->log_debug("isSuccess: {}, response code {}", is_success, http_code); std::shared_ptr<core::FlowFile> response_flow = nullptr; if (is_success) { if (!put_response_body_in_attribute_) { response_flow = session.create(flow_file.get()); // if content type isn't returned we should return application/octet-stream // as per RFC 2046 -- 4.5.1 response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType); response_flow->addAttribute(STATUS_CODE, std::to_string(http_code)); if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); } response_flow->addAttribute(REQUEST_URL, client.getURL()); response_flow->addAttribute(TRANSACTION_ID, transaction_id); io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>()); // need an import from the data stream. session.importFrom(stream, response_flow); } else { if (!response_body.empty()) { std::string body_attribute_str{response_body.data(), response_body.size()}; flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str); } } } route(flow_file, response_flow, session, context, is_success, http_code); } else { session.penalize(flow_file); session.transfer(flow_file, RelFailure); } } void InvokeHTTP::route(const std::shared_ptr<core::FlowFile>& request, const std::shared_ptr<core::FlowFile>& response, core::ProcessSession& session, core::ProcessContext& context, bool is_success, int64_t status_code) { // check if we should yield the processor if (!is_success && request == nullptr) { context.yield(); } // If the property to output the response flowfile regardless of status code is set then transfer it bool response_sent = false; if (always_output_response_ && response != nullptr) { logger_->log_debug("Outputting success and response"); session.transfer(response, RelResponse); response_sent = true; } // transfer to the correct relationship // 2xx -> SUCCESS if (is_success) { // we have two flowfiles to transfer if (request != nullptr) { session.transfer(request, Success); } if (response != nullptr && !response_sent) { logger_->log_debug("Outputting success and response"); session.transfer(response, RelResponse); } // 5xx -> RETRY } else if (status_code / 100 == 5) { if (request != nullptr) { session.penalize(request); session.transfer(request, RelRetry); } // 1xx, 3xx, 4xx -> NO RETRY } else { if (request != nullptr) { if (penalize_no_retry_) { session.penalize(request); } session.transfer(request, RelNoRetry); } } } REGISTER_RESOURCE(InvokeHTTP, Processor); } // namespace org::apache::nifi::minifi::processors