in extensions/standard-processors/processors/InvokeHTTP.cpp [276:370]
void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session,
const std::shared_ptr<core::FlowFile>& flow_file, minifi::http::HTTPClient& client) {
logger_->log_debug("onTrigger InvokeHTTP with {} to {}", magic_enum::enum_name(method_), client.getURL());
const auto remove_callback_from_client_at_exit = gsl::finally([&client] {
client.setUploadCallback({});
});
std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string();
if (shouldEmitFlowFile()) {
logger_->log_trace("InvokeHTTP -- reading flowfile");
const auto flow_file_reader_stream = session.getFlowFileContentStream(*flow_file);
if (flow_file_reader_stream) {
std::unique_ptr<http::HTTPUploadCallback> callback_obj;
if (send_message_body_) {
callback_obj = std::make_unique<http::HTTPUploadStreamContentsCallback>(flow_file_reader_stream);
} else {
callback_obj = std::make_unique<http::HTTPUploadByteArrayInputCallback>();
}
client.setUploadCallback(std::move(callback_obj));
logger_->log_trace("InvokeHTTP -- Setting callback, size is {}", flow_file->getSize());
if (!send_message_body_) {
client.setRequestHeader("Content-Length", "0");
} else if (!use_chunked_encoding_) {
client.setRequestHeader("Content-Length", std::to_string(flow_file->getSize()));
client.setPostSize(flow_file->getSize());
}
} else {
logger_->log_error("InvokeHTTP -- no resource claim");
}
} else {
logger_->log_trace("InvokeHTTP -- Not emitting flowfile to HTTP Server");
}
if (send_date_header_) {
auto current_time = std::chrono::floor<std::chrono::seconds>(std::chrono::system_clock::now());
client.setRequestHeader("Date", utils::timeutils::getRFC2616Format(current_time));
} else {
client.setRequestHeader("Date", std::nullopt);
}
const auto append_header = [&](const std::string& key, const std::string& value) { client.setRequestHeader(key, value); };
if (!appendHeaders(*flow_file, append_header)) {
session.transfer(flow_file, RelFailure);
return;
}
logger_->log_trace("InvokeHTTP -- curl performed");
if (client.submit()) {
logger_->log_trace("InvokeHTTP -- curl successful");
const std::vector<char>& response_body = client.getResponseBody();
const std::vector<std::string>& response_headers = client.getResponseHeaders();
int64_t http_code = client.getResponseCode();
const char* content_type = client.getContentType();
flow_file->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { flow_file->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); }
flow_file->addAttribute(REQUEST_URL, client.getURL());
flow_file->addAttribute(TRANSACTION_ID, transaction_id);
bool is_success = ((http_code / 100) == 2);
logger_->log_debug("isSuccess: {}, response code {}", is_success, http_code);
std::shared_ptr<core::FlowFile> response_flow = nullptr;
if (is_success) {
if (!put_response_body_in_attribute_) {
response_flow = session.create(flow_file.get());
// if content type isn't returned we should return application/octet-stream
// as per RFC 2046 -- 4.5.1
response_flow->addAttribute(core::SpecialFlowAttribute::MIME_TYPE, content_type ? std::string(content_type) : DefaultContentType);
response_flow->addAttribute(STATUS_CODE, std::to_string(http_code));
if (!response_headers.empty()) { response_flow->addAttribute(STATUS_MESSAGE, utils::string::trim(response_headers.at(0))); }
response_flow->addAttribute(REQUEST_URL, client.getURL());
response_flow->addAttribute(TRANSACTION_ID, transaction_id);
io::BufferStream stream(gsl::make_span(response_body).as_span<const std::byte>());
// need an import from the data stream.
session.importFrom(stream, response_flow);
} else {
if (!response_body.empty()) {
std::string body_attribute_str{response_body.data(), response_body.size()};
flow_file->addAttribute(*put_response_body_in_attribute_, body_attribute_str);
}
}
}
route(flow_file, response_flow, session, context, is_success, http_code);
} else {
session.penalize(flow_file);
session.transfer(flow_file, RelFailure);
}
}