void InvokeHTTP::onTriggerWithClient()

in extensions/http-curl/processors/InvokeHTTP.cpp [217:330]


void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session,
                                     minifi::extensions::curl::HTTPClient& client) {
  auto flow_file = session->get();

  if (flow_file == nullptr) {
    if (!shouldEmitFlowFile(client)) {
      logger_->log_debug("InvokeHTTP -- create flow file with  %s", client.getRequestMethod());
      flow_file = session->create();
    } else {
      logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", client.getRequestMethod());
      yield();
      return;
    }
  } else {
    logger_->log_debug("InvokeHTTP -- Received flowfile");
  }

  logger_->log_debug("onTrigger InvokeHTTP with %s to %s", client.getRequestMethod(), client.getURL());

  const auto remove_callback_from_client_at_exit = gsl::finally([&client] {
    client.setUploadCallback({});
  });

  std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string();

  if (shouldEmitFlowFile(client)) {
    logger_->log_trace("InvokeHTTP -- reading flowfile");
    const auto flow_file_reader_stream = session->getFlowFileContentStream(flow_file);
    if (flow_file_reader_stream) {
      std::unique_ptr<utils::HTTPUploadCallback> callback_obj;
      if (send_message_body_) {
        callback_obj = std::make_unique<utils::HTTPUploadStreamContentsCallback>(flow_file_reader_stream);
      } else {
        callback_obj = std::make_unique<utils::HTTPUploadByteArrayInputCallback>();
      }
      client.setUploadCallback(std::move(callback_obj));
      logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", flow_file->getSize());

      if (!send_message_body_) {
        client.setRequestHeader("Content-Length", "0");
      } else if (!use_chunked_encoding_) {
        client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize()));
        client.setPostSize(flow_file->getSize());
      }
    } else {
      logger_->log_error("InvokeHTTP -- no resource claim");
    }
  } else {
    logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
  }

  if (send_date_header_) {
    auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
    client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time));
  } else {
    client.setRequestHeader("Date", std::nullopt);
  }

  const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); };
  if (!appendHeaders(*flow_file, append_header)) {
    session->transfer(flow_file, RelFailure);
    return;
  }

  logger_->log_trace("InvokeHTTP -- curl performed");
  if (client.submit()) {
    logger_->log_trace("InvokeHTTP -- curl successful");

    const std::vector<char>& response_body = client.getResponseBody();
    const std::vector<std::string>& response_headers = client.getResponseHeaders();

    int64_t http_code = client.getResponseCode();
    const char* content_type = client.getContentType();
    flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
    if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
    flow_file->addAttribute(REQUEST_URL, client.getURL());
    flow_file->addAttribute(TRANSACTION_ID, transaction_id);

    bool is_success = ((http_code / 100) == 2);

    logger_->log_debug("isSuccess: %d, response code %" PRId64, is_success, http_code);
    std::shared_ptr<core::FlowFile> response_flow = nullptr;

    if (is_success) {
      if (!put_response_body_in_attribute_) {
        if (flow_file != nullptr) {
          response_flow = session->create(flow_file);
        } else {
          response_flow = session->create();
        }

        // if content type isn't returned we should return application/octet-stream
        // as per RFC 2046 -- 4.5.1
        response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
        response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
        if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, response_headers.at(0)); }
        response_flow->addAttribute(REQUEST_URL, client.getURL());
        response_flow->addAttribute(TRANSACTION_ID, transaction_id);
        io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
        // need an import from the data stream.
        session->importFrom(stream, response_flow);
      } else {
        if (!response_body.empty()) {
          std::string body_attribute_str{response_body.data(), response_body.size()};
          flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str);
        }
      }
    }
    route(flow_file, response_flow, session, context, is_success, http_code);
  } else {
    session->penalize(flow_file);
    session->transfer(flow_file, RelFailure);
  }
}