extensions/http-curl/processors/InvokeHTTP.cpp (287 lines of code) (raw):
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "InvokeHTTP.h"
#include <cinttypes>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "core/Resource.h"
#include "io/BufferStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/gsl.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/OptionalUtils.h"
#include "range/v3/view/filter.hpp"
#include "range/v3/algorithm/any_of.hpp"
namespace org::apache::nifi::minifi::processors {
std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
void InvokeHTTP::initialize() {
  logger_->log_trace("Initializing InvokeHTTP");
  setSupportedProperties(Properties);
  setSupportedRelationships(Relationships);
}
namespace {
void setupClientTimeouts(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
  if (auto connection_timeout = context.getProperty<core::TimePeriodValue>(InvokeHTTP::ConnectTimeout))
    client.setConnectionTimeout(connection_timeout->getMilliseconds());
  if (auto read_timeout = context.getProperty<core::TimePeriodValue>(InvokeHTTP::ReadTimeout))
    client.setReadTimeout(read_timeout->getMilliseconds());
}
void setupClientProxy(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
  utils::HTTPProxy proxy = {};
  context.getProperty(InvokeHTTP::ProxyHost, proxy.host);
  std::string port_str;
  if (context.getProperty(InvokeHTTP::ProxyPort, port_str) && !port_str.empty()) {
    core::Property::StringToInt(port_str, proxy.port);
  }
  context.getProperty(InvokeHTTP::ProxyUsername, proxy.username);
  context.getProperty(InvokeHTTP::ProxyPassword, proxy.password);
  client.setHTTPProxy(proxy);
}
void setupClientPeerVerification(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
  if (auto disable_peer_verification = context.getProperty(InvokeHTTP::DisablePeerVerification) | utils::flatMap(&utils::StringUtils::toBool)) {
    client.setPeerVerification(*disable_peer_verification);
  }
}
void setupClientFollowRedirects(extensions::curl::HTTPClient& client, const core::ProcessContext& context) {
  if (auto follow_redirects = context.getProperty<bool>(InvokeHTTP::FollowRedirects))
    client.setFollowRedirects(*follow_redirects);
}
void setupClientContentType(extensions::curl::HTTPClient& client, const core::ProcessContext& context, bool send_body) {
  if (auto content_type = context.getProperty(InvokeHTTP::ContentType)) {
    if (send_body)
      client.setContentType(*content_type);
  }
}
void setupClientTransferEncoding(extensions::curl::HTTPClient& client, bool use_chunked_encoding) {
  if (use_chunked_encoding)
    client.setRequestHeader("Transfer-Encoding", "chunked");
  else
    client.setRequestHeader("Transfer-Encoding", std::nullopt);
}
}  // namespace
void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) {
  context.getProperty(SendMessageBody, send_message_body_);
  attributes_to_send_ = context.getProperty(AttributesToSend)
                        | utils::filter([](const std::string& s) { return !s.empty(); })  // avoid compiling an empty string to regex
                        | utils::map([](const std::string& regex_str) { return utils::Regex{regex_str}; })
                        | utils::orElse([this] { logger_->log_debug("%s is missing, so the default value will be used", std::string{AttributesToSend.name}); });
  always_output_response_ = (context.getProperty(AlwaysOutputResponse) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
  penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
  invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<invoke_http::InvalidHTTPHeaderFieldHandlingOption>(context, InvalidHTTPHeaderFieldHandlingStrategy);
  put_response_body_in_attribute_ = context.getProperty(PutResponseBodyInAttribute);
  if (put_response_body_in_attribute_ && put_response_body_in_attribute_->empty()) {
    logger_->log_warn("%s is set to an empty string", std::string{PutResponseBodyInAttribute.name});
    put_response_body_in_attribute_.reset();
  }
  use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
  send_date_header_ = context.getProperty<bool>(DateHeader).value_or(true);
}
std::unique_ptr<minifi::extensions::curl::HTTPClient> InvokeHTTP::createHTTPClientFromPropertiesAndMembers(const core::ProcessContext& context) const {
  std::string method;
  if (!context.getProperty(Method, method))
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Method property missing or invalid");
  std::string url;
  if (!context.getProperty(URL, url))
    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid");
  std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service;
  if (auto ssl_context_name = context.getProperty(SSLContext)) {
    if (auto service = context.getControllerService(*ssl_context_name)) {
      ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(service);
      if (!ssl_context_service)
        logger_->log_error("Controller service '%s' is not an SSLContextService", *ssl_context_name);
    } else {
      logger_->log_error("Couldn't find controller service with name '%s'", *ssl_context_name);
    }
  }
  auto client = std::make_unique<minifi::extensions::curl::HTTPClient>();
  client->initialize(std::move(method), std::move(url), std::move(ssl_context_service));
  setupClientTimeouts(*client, context);
  setupClientProxy(*client, context);
  setupClientFollowRedirects(*client, context);
  setupClientPeerVerification(*client, context);
  setupClientContentType(*client, context, send_message_body_);
  setupClientTransferEncoding(*client, use_chunked_encoding_);
  return client;
}
void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
  gsl_Expects(context);
  setupMembersFromProperties(*context);
  std::weak_ptr<core::ProcessContext> weak_context = context;
  auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> {
    if (auto context = weak_context.lock())
      return createHTTPClientFromPropertiesAndMembers(*context);
    else
      return nullptr;
  };
  client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_);
}
bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) {
  auto method = client.getRequestMethod();
  return ("POST" == method || "PUT" == method || "PATCH" == method);
}
/**
 * Calls append_header with valid HTTP header keys, based on attributes_to_send_
 * @param flow_file
 * @param append_header Callback to append HTTP header to the request
 * @return false when the flow file should be routed to failure, true otherwise
 */
bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header) {
  static_assert(std::is_invocable_v<decltype(append_header), std::string, std::string>);
  if (!attributes_to_send_) return true;
  const auto key_fn = [](const std::pair<std::string, std::string>& pair) { return pair.first; };
  const auto original_attributes = flow_file.getAttributes();
  // non-const views, because otherwise it doesn't satisfy viewable_range, and transform would fail
  ranges::viewable_range auto matching_attributes = original_attributes
      | ranges::views::filter([this](const auto& key) { return utils::regexMatch(key, *attributes_to_send_); }, key_fn);
  switch (invalid_http_header_field_handling_strategy_) {
    case invoke_http::InvalidHTTPHeaderFieldHandlingOption::fail:
      if (ranges::any_of(matching_attributes, std::not_fn(&extensions::curl::HTTPClient::isValidHttpHeaderField), key_fn)) return false;
      for (const auto& header: matching_attributes) append_header(header.first, header.second);
      return true;
    case invoke_http::InvalidHTTPHeaderFieldHandlingOption::drop:
      for (const auto& header: matching_attributes | ranges::views::filter(&extensions::curl::HTTPClient::isValidHttpHeaderField, key_fn)) {
        append_header(header.first, header.second);
      }
      return true;
    case invoke_http::InvalidHTTPHeaderFieldHandlingOption::transform:
      for (const auto& header: matching_attributes) {
        append_header(extensions::curl::HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(header.first), header.second);
      }
      return true;
  }
  return true;
}
void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
  gsl_Expects(session && context && client_queue_);
  auto client = client_queue_->getResource();
  onTriggerWithClient(context, session, *client);
}
void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session,
                                     minifi::extensions::curl::HTTPClient& client) {
  auto flow_file = session->get();
  if (flow_file == nullptr) {
    if (!shouldEmitFlowFile(client)) {
      logger_->log_debug("InvokeHTTP -- create flow file with  %s", client.getRequestMethod());
      flow_file = session->create();
    } else {
      logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", client.getRequestMethod());
      yield();
      return;
    }
  } else {
    logger_->log_debug("InvokeHTTP -- Received flowfile");
  }
  logger_->log_debug("onTrigger InvokeHTTP with %s to %s", client.getRequestMethod(), client.getURL());
  const auto remove_callback_from_client_at_exit = gsl::finally([&client] {
    client.setUploadCallback({});
  });
  std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
  if (shouldEmitFlowFile(client)) {
    logger_->log_trace("InvokeHTTP -- reading flowfile");
    const auto flow_file_reader_stream = session->getFlowFileContentStream(flow_file);
    if (flow_file_reader_stream) {
      std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
      if (send_message_body_) {
        callback_obj = std::make_unique<utils::HTTPUploadStreamContentsCallback>(flow_file_reader_stream);
      } else {
        callback_obj = std::make_unique<utils::HTTPUploadByteArrayInputCallback>();
      }
      client.setUploadCallback(std::move(callback_obj));
      logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", flow_file->getSize());
      if (!send_message_body_) {
        client.setRequestHeader("Content-Length", "0");
      } else if (!use_chunked_encoding_) {
        client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize()));
        client.setPostSize(flow_file->getSize());
      }
    } else {
      logger_->log_error("InvokeHTTP -- no resource claim");
    }
  } else {
    logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
  }
  if (send_date_header_) {
    auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
    client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time));
  } else {
    client.setRequestHeader("Date", std::nullopt);
  }
  const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); };
  if (!appendHeaders(*flow_file, append_header)) {
    session->transfer(flow_file, RelFailure);
    return;
  }
  logger_->log_trace("InvokeHTTP -- curl performed");
  if (client.submit()) {
    logger_->log_trace("InvokeHTTP -- curl successful");
    const std::vector<char>& response_body = client.getResponseBody();
    const std::vector<std::string>& response_headers = client.getResponseHeaders();
    int64_t http_code = client.getResponseCode();
    const char* content_type = client.getContentType();
    flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
    if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
    flow_file->addAttribute(REQUEST_URL, client.getURL());
    flow_file->addAttribute(TRANSACTION_ID, transaction_id);
    bool is_success = ((http_code / 100) == 2);
    logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
    std::shared_ptr<core::FlowFile> response_flow = nullptr;
    if (is_success) {
      if (!put_response_body_in_attribute_) {
        if (flow_file != nullptr) {
          response_flow = session->create(flow_file);
        } else {
          response_flow = session->create();
        }
        // if content type isn't returned we should return application/octet-stream
        // as per RFC 2046 -- 4.5.1
        response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
        response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
        if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
        response_flow->addAttribute(REQUEST_URL, client.getURL());
        response_flow->addAttribute(TRANSACTION_ID, transaction_id);
        io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
        // need an import from the data stream.
        session->importFrom(stream, response_flow);
      } else {
        if (!response_body.empty()) {
          std::string body_attribute_str{response_body.data(), response_body.size()};
          flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str);
        }
      }
    }
    route(flow_file, response_flow, session, context, is_success, http_code);
  } else {
    session->penalize(flow_file);
    session->transfer(flow_file, RelFailure);
  }
}
void InvokeHTTP::route(const std::shared_ptr<core::FlowFile>& request, const std::shared_ptr<core::FlowFile>& response, const std::shared_ptr<core::ProcessSession>& session,
                       const std::shared_ptr<core::ProcessContext>& context, bool is_success, int64_t status_code) {
  // check if we should yield the processor
  if (!is_success && request == nullptr) {
    context->yield();
  }
  // If the property to output the response flowfile regardless of status code is set then transfer it
  bool response_sent = false;
  if (always_output_response_ && response != nullptr) {
    logger_->log_debug("Outputting success and response");
    session->transfer(response, RelResponse);
    response_sent = true;
  }
  // transfer to the correct relationship
  // 2xx -> SUCCESS
  if (is_success) {
    // we have two flowfiles to transfer
    if (request != nullptr) {
      session->transfer(request, Success);
    }
    if (response != nullptr && !response_sent) {
      logger_->log_debug("Outputting success and response");
      session->transfer(response, RelResponse);
    }
    // 5xx -> RETRY
  } else if (status_code / 100 == 5) {
    if (request != nullptr) {
      session->penalize(request);
      session->transfer(request, RelRetry);
    }
    // 1xx, 3xx, 4xx -> NO RETRY
  } else {
    if (request != nullptr) {
      if (penalize_no_retry_) {
        session->penalize(request);
      }
      session->transfer(request, RelNoRetry);
    }
  }
}
REGISTER_RESOURCE(InvokeHTTP, Processor);
}  // namespace org::apache::nifi::minifi::processors