in libminifi/src/c2/protocols/RESTSender.cpp [118:217]
C2Payload RESTSender::sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data,
const std::optional<std::vector<std::string>>& accepted_formats) {
if (url.empty()) {
return {payload.getOperation(), state::UpdateState::READ_ERROR};
}
// Client declared last to make sure callbacks are still available when client is destructed
http::HTTPClient client(url, ssl_context_service_);
client.setKeepAliveProbe(http::KeepAliveProbeData{2s, 2s});
client.setConnectionTimeout(2s);
auto setUpHttpRequest = [&](http::HttpRequestMethod http_method) {
client.set_request_method(http_method);
if (url.find("https://") == 0) {
if (!ssl_context_service_) {
setSecurityContext(client, http_method, url);
} else {
client.initialize(http_method, url, ssl_context_service_);
}
}
};
if (direction == Direction::TRANSMIT) {
setUpHttpRequest(http::HttpRequestMethod::POST);
if (payload.getOperation() == Operation::transfer) {
// treat nested payloads as files
for (const auto& file : payload.getNestedPayloads()) {
std::string filename = file.getLabel();
if (filename.empty()) {
throw std::logic_error("Missing filename");
}
auto file_cb = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
file_cb->write(file.getRawDataAsString());
client.addFormPart("application/octet-stream", "file", std::move(file_cb), filename);
}
} else {
auto data_input = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
if (data && req_encoding_ == RequestEncoding::gzip) {
io::BufferStream compressed_payload;
bool compression_success = [&] {
io::ZlibCompressStream compressor(gsl::make_not_null(&compressed_payload), io::ZlibCompressionFormat::GZIP, Z_BEST_COMPRESSION);
auto ret = compressor.write(as_bytes(std::span(data.value())));
if (ret != data->length()) {
return false;
}
compressor.close();
return compressor.isFinished();
}();
if (compression_success) {
data_input->setBuffer(compressed_payload.moveBuffer());
client.setRequestHeader("Content-Encoding", "gzip");
} else {
logger_->log_error("Failed to compress request body, falling back to no compression");
data_input->write(data.value());
}
} else {
data_input->write(data.value_or(""));
}
client.setPostSize(data_input->getBufferSize());
client.setUploadCallback(std::move(data_input));
}
} else {
// we do not need to set the upload callback
// since we are not uploading anything on a get
setUpHttpRequest(http::HttpRequestMethod::GET);
}
if (payload.getOperation() == Operation::transfer) {
auto read = std::make_unique<http::HTTPReadCallback>(std::numeric_limits<size_t>::max());
client.setReadCallback(std::move(read));
if (accepted_formats && !accepted_formats->empty()) {
client.setRequestHeader("Accept", utils::string::join(", ", accepted_formats.value()));
}
} else {
// Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible
// TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
// client.setRequestHeader("Accept", "application/json");
client.setContentType("application/json");
}
bool isOkay = client.submit();
int64_t respCode = client.getResponseCode();
const bool clientError = 400 <= respCode && respCode < 500;
const bool serverError = 500 <= respCode && respCode < 600;
if (clientError || serverError) {
logger_->log_error("Error response code '{}' from '{}'", respCode, url);
} else {
logger_->log_debug("Response code '{}' from '{}'", respCode, url);
}
const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>();
logger_->log_trace("Received response: \"{}\"", [&] { return utils::string::escapeUnprintableBytes(response_body_bytes); });
if (isOkay && !clientError && !serverError) {
if (accepted_formats) {
C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true);
response_payload.setRawData(response_body_bytes);
return response_payload;
}
return parseJsonResponse(payload, response_body_bytes);
} else {
return {payload.getOperation(), state::UpdateState::READ_ERROR};
}
}