void InvokeHTTP::onTriggerWithClient()

in extensions/standard-processors/processors/InvokeHTTP.cpp [276:370]


void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session,
    const std::shared_ptr<core::FlowFile>& flow_file, minifi::http::HTTPClient& client) {
  logger_->log_debug("onTrigger InvokeHTTP with {} to {}", magic_enum::enum_name(method_), client.getURL());

  const auto remove_callback_from_client_at_exit = gsl::finally([&client] {
    client.setUploadCallback({});
  });

  std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string();

  if (shouldEmitFlowFile()) {
    logger_->log_trace("InvokeHTTP -- reading flowfile");
    const auto flow_file_reader_stream = session.getFlowFileContentStream(*flow_file);
    if (flow_file_reader_stream) {
      std::unique_ptr<http::HTTPUploadCallback> callback_obj;
      if (send_message_body_) {
        callback_obj = std::make_unique<http::HTTPUploadStreamContentsCallback>(flow_file_reader_stream);
      } else {
        callback_obj = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
      }
      client.setUploadCallback(std::move(callback_obj));
      logger_->log_trace("InvokeHTTP -- Setting callback, size is {}", flow_file->getSize());

      if (!send_message_body_) {
        client.setRequestHeader("Content-Length", "0");
      } else if (!use_chunked_encoding_) {
        client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize()));
        client.setPostSize(flow_file->getSize());
      }
    } else {
      logger_->log_error("InvokeHTTP -- no resource claim");
    }
  } else {
    logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
  }

  if (send_date_header_) {
    auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
    client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time));
  } else {
    client.setRequestHeader("Date", std::nullopt);
  }

  const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); };
  if (!appendHeaders(*flow_file, append_header)) {
    session.transfer(flow_file, RelFailure);
    return;
  }

  logger_->log_trace("InvokeHTTP -- curl performed");
  if (client.submit()) {
    logger_->log_trace("InvokeHTTP -- curl successful");

    const std::vector<char>& response_body = client.getResponseBody();
    const std::vector<std::string>& response_headers = client.getResponseHeaders();

    int64_t http_code = client.getResponseCode();
    const char* content_type = client.getContentType();
    flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
    if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); }
    flow_file->addAttribute(REQUEST_URL, client.getURL());
    flow_file->addAttribute(TRANSACTION_ID, transaction_id);

    bool is_success = ((http_code / 100) == 2);

    logger_->log_debug("isSuccess: {}, response code {}", is_success, http_code);
    std::shared_ptr<core::FlowFile> response_flow = nullptr;

    if (is_success) {
      if (!put_response_body_in_attribute_) {
        response_flow = session.create(flow_file.get());

        // if content type isn't returned we should return application/octet-stream
        // as per RFC 2046 -- 4.5.1
        response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
        response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
        if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); }
        response_flow->addAttribute(REQUEST_URL, client.getURL());
        response_flow->addAttribute(TRANSACTION_ID, transaction_id);
        io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
        // need an import from the data stream.
        session.importFrom(stream, response_flow);
      } else {
        if (!response_body.empty()) {
          std::string body_attribute_str{response_body.data(), response_body.size()};
          flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str);
        }
      }
    }
    route(flow_file, response_flow, session, context, is_success, http_code);
  } else {
    session.penalize(flow_file);
    session.transfer(flow_file, RelFailure);
  }
}