C2Payload RESTSender::sendPayload()

in libminifi/src/c2/protocols/RESTSender.cpp [118:217]


C2Payload RESTSender::sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data,
                                  const std::optional<std::vector<std::string>>& accepted_formats) {
  if (url.empty()) {
    return {payload.getOperation(), state::UpdateState::READ_ERROR};
  }

  // Client declared last to make sure callbacks are still available when client is destructed
  http::HTTPClient client(url, ssl_context_service_);
  client.setKeepAliveProbe(http::KeepAliveProbeData{2s, 2s});
  client.setConnectionTimeout(2s);

  auto setUpHttpRequest = [&](http::HttpRequestMethod http_method) {
    client.set_request_method(http_method);
    if (url.find("https://") == 0) {
      if (!ssl_context_service_) {
        setSecurityContext(client, http_method, url);
      } else {
        client.initialize(http_method, url, ssl_context_service_);
      }
    }
  };
  if (direction == Direction::TRANSMIT) {
    setUpHttpRequest(http::HttpRequestMethod::POST);
    if (payload.getOperation() == Operation::transfer) {
      // treat nested payloads as files
      for (const auto& file : payload.getNestedPayloads()) {
        std::string filename = file.getLabel();
        if (filename.empty()) {
          throw std::logic_error("Missing filename");
        }
        auto file_cb = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
        file_cb->write(file.getRawDataAsString());
        client.addFormPart("application/octet-stream", "file", std::move(file_cb), filename);
      }
    } else {
      auto data_input = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
      if (data && req_encoding_ == RequestEncoding::gzip) {
        io::BufferStream compressed_payload;
        bool compression_success = [&] {
          io::ZlibCompressStream compressor(gsl::make_not_null(&compressed_payload), io::ZlibCompressionFormat::GZIP, Z_BEST_COMPRESSION);
          auto ret = compressor.write(as_bytes(std::span(data.value())));
          if (ret != data->length()) {
            return false;
          }
          compressor.close();
          return compressor.isFinished();
        }();
        if (compression_success) {
          data_input->setBuffer(compressed_payload.moveBuffer());
          client.setRequestHeader("Content-Encoding", "gzip");
        } else {
          logger_->log_error("Failed to compress request body, falling back to no compression");
          data_input->write(data.value());
        }
      } else {
        data_input->write(data.value_or(""));
      }
      client.setPostSize(data_input->getBufferSize());
      client.setUploadCallback(std::move(data_input));
    }
  } else {
    // we do not need to set the upload callback
    // since we are not uploading anything on a get
    setUpHttpRequest(http::HttpRequestMethod::GET);
  }

  if (payload.getOperation() == Operation::transfer) {
    auto read = std::make_unique<http::HTTPReadCallback>(std::numeric_limits<size_t>::max());
    client.setReadCallback(std::move(read));
    if (accepted_formats && !accepted_formats->empty()) {
      client.setRequestHeader("Accept", utils::string::join(", ", accepted_formats.value()));
    }
  } else {
    // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
    // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
    // client.setRequestHeader("Accept", "application/json");
    client.setContentType("application/json");
  }
  bool isOkay = client.submit();
  int64_t respCode = client.getResponseCode();
  const bool clientError = 400 <= respCode && respCode < 500;
  const bool serverError = 500 <= respCode && respCode < 600;
  if (clientError || serverError) {
    logger_->log_error("Error response code '{}' from '{}'", respCode, url);
  } else {
    logger_->log_debug("Response code '{}' from '{}'", respCode, url);
  }
  const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>();
  logger_->log_trace("Received response: \"{}\"", [&] { return utils::string::escapeUnprintableBytes(response_body_bytes); });
  if (isOkay && !clientError && !serverError) {
    if (accepted_formats) {
      C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
      response_payload.setRawData(response_body_bytes);
      return response_payload;
    }
    return parseJsonResponse(payload, response_body_bytes);
  } else {
    return {payload.getOperation(), state::UpdateState::READ_ERROR};
  }
}