extensions/http-curl/client/HTTPClient.cpp (436 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "HTTPClient.h"
#include <algorithm>
#include <cinttypes>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "Exception.h"
#include "utils/gsl.h"
#include "utils/StringUtils.h"
#include "core/Resource.h"
#include "utils/RegexUtils.h"
#include "range/v3/algorithm/all_of.hpp"
#include "range/v3/action/transform.hpp"
#include "utils/HTTPUtils.h"
#include "utils/Literals.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::extensions::curl {
HTTPClient::HTTPClient(std::string url, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
: core::Connectable("HTTPClient"),
ssl_context_service_(std::move(ssl_context_service)),
url_(std::move(url)) {
http_session_.reset(curl_easy_init());
}
HTTPClient::HTTPClient(std::string name, const utils::Identifier& uuid)
: core::Connectable(std::move(name), uuid) {
http_session_.reset(curl_easy_init());
}
HTTPClient::HTTPClient()
: core::Connectable("HTTPClient") {
http_session_.reset(curl_easy_init());
}
void HTTPClient::addFormPart(const std::string& content_type, const std::string& name, std::unique_ptr<utils::HTTPUploadCallback> form_callback, const std::optional<std::string>& filename) {
if (!form_) {
form_.reset(curl_mime_init(http_session_.get()));
}
form_callback_ = std::move(form_callback);
curl_mimepart* part = curl_mime_addpart(form_.get());
curl_mime_type(part, content_type.c_str());
if (filename) {
curl_mime_filename(part, filename->c_str());
}
curl_mime_name(part, name.c_str());
curl_mime_data_cb(part, gsl::narrow<curl_off_t>(form_callback_->size()),
&utils::HTTPRequestResponse::send_write, nullptr, nullptr, static_cast<void*>(form_callback_.get()));
}
HTTPClient::~HTTPClient() {
// forceClose ended up not being the issue in MINIFICPP-667, but leaving here
// out of good hygiene.
forceClose();
content_.close();
logger_->log_trace("Closing HTTPClient for %s", url_);
}
void HTTPClient::forceClose() {
if (nullptr != read_callback_) {
read_callback_->stop = true;
}
if (nullptr != write_callback_) {
write_callback_->requestStop();
}
}
int HTTPClient::debug_callback(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr) {
auto* const logger = static_cast<std::shared_ptr<core::logging::Logger>*>(userptr);
if (logger == nullptr) {
return 0;
}
if (type == CURLINFO_TEXT) {
core::logging::LOG_DEBUG(*logger) << "CURL(" << reinterpret_cast<void*>(handle) << "): " << std::string(data, size);
}
return 0;
}
void HTTPClient::setVerbose(bool use_stderr) {
curl_easy_setopt(http_session_.get(), CURLOPT_VERBOSE, 1L);
if (!use_stderr) {
curl_easy_setopt(http_session_.get(), CURLOPT_DEBUGDATA, &logger_);
curl_easy_setopt(http_session_.get(), CURLOPT_DEBUGFUNCTION, &debug_callback);
}
}
namespace {
bool isSecure(const std::string& url) {
return url.starts_with("https");
}
} // namespace
void HTTPClient::initialize(std::string method, std::string url, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) {
set_request_method(std::move(method));
if (ssl_context_service) {
ssl_context_service_ = std::move(ssl_context_service);
}
if (!url.empty()) {
url_ = std::move(url);
}
if (isSecure(url_))
configure_secure_connection();
}
void HTTPClient::setPeerVerification(bool peer_verification) {
logger_->log_debug("%s peer verification", peer_verification ? "Enabling" : "Disabling");
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_VERIFYPEER, peer_verification);
}
void HTTPClient::setHostVerification(bool host_verification) {
logger_->log_debug("%s host verification", host_verification ? "Enabling" : "Disabling");
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_VERIFYHOST, host_verification);
}
void HTTPClient::setBasicAuth(const std::string& username, const std::string& password) {
curl_easy_setopt(http_session_.get(), CURLOPT_USERNAME, username.c_str());
curl_easy_setopt(http_session_.get(), CURLOPT_PASSWORD, password.c_str());
}
void HTTPClient::clearBasicAuth() {
curl_easy_setopt(http_session_.get(), CURLOPT_USERNAME, nullptr);
curl_easy_setopt(http_session_.get(), CURLOPT_PASSWORD, nullptr);
}
bool HTTPClient::setSpecificSSLVersion(utils::SSLVersion specific_version) {
#ifdef OPENSSL_SUPPORT
if (ssl_context_service_) {
switch (specific_version) {
case utils::SSLVersion::TLSv1_0: {
ssl_context_service_->setMinTlsVersion(TLS1_VERSION);
ssl_context_service_->setMaxTlsVersion(TLS1_VERSION);
break;
}
case utils::SSLVersion::TLSv1_1: {
ssl_context_service_->setMinTlsVersion(TLS1_1_VERSION);
ssl_context_service_->setMaxTlsVersion(TLS1_1_VERSION);
break;
}
case utils::SSLVersion::TLSv1_2: {
ssl_context_service_->setMinTlsVersion(TLS1_2_VERSION);
ssl_context_service_->setMaxTlsVersion(TLS1_2_VERSION);
break;
}
default: break;
}
}
#endif
#if CURL_AT_LEAST_VERSION(7, 54, 0)
// bitwise or of different enum types is deprecated in C++20, but the curl api explicitly supports ORing one of CURL_SSLVERSION and one of CURL_SSLVERSION_MAX
switch (specific_version) {
case utils::SSLVersion::TLSv1_0:
return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_0) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_0));
case utils::SSLVersion::TLSv1_1:
return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_1) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_1));
case utils::SSLVersion::TLSv1_2:
return CURLE_OK == curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, static_cast<int>(CURL_SSLVERSION_TLSv1_2) | static_cast<int>(CURL_SSLVERSION_MAX_TLSv1_2));
default: return false;
}
#else
return false;
#endif
}
// If not set, the default will be TLS 1.0, see https://curl.haxx.se/libcurl/c/CURLOPT_SSLVERSION.html
bool HTTPClient::setMinimumSSLVersion(utils::SSLVersion minimum_version) {
#ifdef OPENSSL_SUPPORT
if (ssl_context_service_) {
switch (minimum_version) {
case utils::SSLVersion::TLSv1_0: {
ssl_context_service_->setMinTlsVersion(TLS1_VERSION);
break;
}
case utils::SSLVersion::TLSv1_1: {
ssl_context_service_->setMinTlsVersion(TLS1_1_VERSION);
break;
}
case utils::SSLVersion::TLSv1_2: {
ssl_context_service_->setMinTlsVersion(TLS1_2_VERSION);
break;
}
default: break;
}
}
#endif
CURLcode ret = CURLE_UNKNOWN_OPTION;
switch (minimum_version) {
case utils::SSLVersion::TLSv1_0:
ret = curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_0);
break;
case utils::SSLVersion::TLSv1_1:
ret = curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_1);
break;
case utils::SSLVersion::TLSv1_2:
ret = curl_easy_setopt(http_session_.get(), CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
break;
}
return ret == CURLE_OK;
}
void HTTPClient::setKeepAliveProbe(std::optional<KeepAliveProbeData> probe_data) {
if (probe_data) {
curl_easy_setopt(http_session_.get(), CURLOPT_TCP_KEEPALIVE, true);
curl_easy_setopt(http_session_.get(), CURLOPT_TCP_KEEPINTVL, probe_data->keep_alive_interval.count());
curl_easy_setopt(http_session_.get(), CURLOPT_TCP_KEEPIDLE, probe_data->keep_alive_delay.count());
} else {
curl_easy_setopt(http_session_.get(), CURLOPT_TCP_KEEPALIVE, false);
}
}
void HTTPClient::setConnectionTimeout(std::chrono::milliseconds timeout) {
if (timeout < 0ms) {
logger_->log_error("Invalid HTTP connection timeout %" PRId64 " ms", timeout.count());
return;
}
connect_timeout_ = timeout;
}
void HTTPClient::setReadTimeout(std::chrono::milliseconds timeout) {
if (timeout < 0ms) {
logger_->log_error("Invalid HTTP read timeout %" PRId64 " ms", timeout.count());
return;
}
read_timeout_ = timeout;
}
void HTTPClient::setReadCallback(std::unique_ptr<utils::HTTPReadCallback> callback) {
read_callback_ = std::move(callback);
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::receiveWrite);
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEDATA, static_cast<void*>(read_callback_.get()));
}
void HTTPClient::setUploadCallback(std::unique_ptr<utils::HTTPUploadCallback> callback) {
logger_->log_debug("Setting callback for %s", url_);
write_callback_ = std::move(callback);
if (method_ == "PUT") {
curl_easy_setopt(http_session_.get(), CURLOPT_INFILESIZE_LARGE, (curl_off_t) write_callback_->size());
}
curl_easy_setopt(http_session_.get(), CURLOPT_READFUNCTION, &utils::HTTPRequestResponse::send_write);
curl_easy_setopt(http_session_.get(), CURLOPT_READDATA, static_cast<void*>(write_callback_.get()));
curl_easy_setopt(http_session_.get(), CURLOPT_SEEKDATA, static_cast<void*>(write_callback_.get()));
curl_easy_setopt(http_session_.get(), CURLOPT_SEEKFUNCTION, &utils::HTTPRequestResponse::seek_callback);
}
void HTTPClient::setContentType(std::string content_type) {
request_headers_["Content-Type"] = std::move(content_type);
}
std::string HTTPClient::escape(std::string string_to_escape) {
struct curl_deleter { void operator()(void* p) noexcept { curl_free(p); } };
std::unique_ptr<char, curl_deleter> escaped_chars{curl_easy_escape(http_session_.get(), string_to_escape.c_str(), gsl::narrow<int>(string_to_escape.length()))};
std::string escaped_string(escaped_chars.get());
return escaped_string;
}
void HTTPClient::setPostFields(const std::string& input) {
setPostSize(input.length());
curl_easy_setopt(http_session_.get(), CURLOPT_COPYPOSTFIELDS, input.c_str());
}
void HTTPClient::setPostSize(size_t size) {
if (size > 2_GB) {
curl_easy_setopt(http_session_.get(), CURLOPT_POSTFIELDSIZE_LARGE, size);
} else {
curl_easy_setopt(http_session_.get(), CURLOPT_POSTFIELDSIZE, size);
}
}
void HTTPClient::setHTTPProxy(const utils::HTTPProxy &proxy) {
if (!proxy.host.empty()) {
curl_easy_setopt(http_session_.get(), CURLOPT_PROXY, proxy.host.c_str());
curl_easy_setopt(http_session_.get(), CURLOPT_PROXYPORT, proxy.port);
if (!proxy.username.empty()) {
curl_easy_setopt(http_session_.get(), CURLOPT_PROXYAUTH, CURLAUTH_ANY);
std::string value = proxy.username + ":" + proxy.password;
curl_easy_setopt(http_session_.get(), CURLOPT_PROXYUSERPWD, value.c_str());
}
}
}
void HTTPClient::setRequestHeader(std::string key, std::optional<std::string> value) {
if (value)
request_headers_[std::move(key)] = std::move(*value);
else
request_headers_.erase(key);
}
namespace {
struct CurlSListFreeAll {
void operator()(struct curl_slist* slist) const {
curl_slist_free_all(slist);
}
};
std::unique_ptr<struct curl_slist, CurlSListFreeAll> getCurlSList(const std::unordered_map<std::string, std::string>& request_headers) {
curl_slist* new_list = nullptr;
for (const auto& [header_key, header_value] : request_headers)
new_list = curl_slist_append(new_list, utils::StringUtils::join_pack(header_key, ": ", header_value).c_str());
return {new_list, {}};
}
} // namespace
bool HTTPClient::submit() {
if (url_.empty()) {
logger_->log_error("Tried to submit to an empty url");
return false;
}
response_data_.clear();
curl_easy_setopt(http_session_.get(), CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(http_session_.get(), CURLOPT_CONNECTTIMEOUT_MS, connect_timeout_.count());
curl_easy_setopt(http_session_.get(), CURLOPT_TIMEOUT_MS, getAbsoluteTimeout().count());
if (read_timeout_ > 0ms) {
progress_.reset();
curl_easy_setopt(http_session_.get(), CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(http_session_.get(), CURLOPT_XFERINFOFUNCTION, onProgress);
curl_easy_setopt(http_session_.get(), CURLOPT_XFERINFODATA, this);
} else {
// the user explicitly set it to 0
curl_easy_setopt(http_session_.get(), CURLOPT_NOPROGRESS, 1);
}
auto headers = getCurlSList(request_headers_);
if (headers) {
curl_slist_append(headers.get(), "Expect:");
curl_easy_setopt(http_session_.get(), CURLOPT_HTTPHEADER, headers.get());
}
curl_easy_setopt(http_session_.get(), CURLOPT_URL, url_.c_str());
logger_->log_debug("Submitting to %s", url_);
if (read_callback_ == nullptr) {
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::receiveWrite);
curl_easy_setopt(http_session_.get(), CURLOPT_WRITEDATA, static_cast<void*>(&content_));
}
curl_easy_setopt(http_session_.get(), CURLOPT_HEADERFUNCTION, &utils::HTTPHeaderResponse::receive_headers);
curl_easy_setopt(http_session_.get(), CURLOPT_HEADERDATA, static_cast<void*>(&response_data_.header_response));
if (form_ != nullptr) {
curl_easy_setopt(http_session_.get(), CURLOPT_MIMEPOST, form_.get());
}
res_ = curl_easy_perform(http_session_.get());
if (read_callback_ == nullptr) {
content_.close();
}
long http_code; // NOLINT(runtime/int) long due to libcurl API
curl_easy_getinfo(http_session_.get(), CURLINFO_RESPONSE_CODE, &http_code);
response_data_.response_code = http_code;
curl_easy_getinfo(http_session_.get(), CURLINFO_CONTENT_TYPE, &response_data_.response_content_type);
if (res_ == CURLE_OPERATION_TIMEDOUT) {
logger_->log_error("HTTP operation timed out, with absolute timeout %" PRId64 "ms\n", getAbsoluteTimeout().count());
}
if (res_ != CURLE_OK) {
logger_->log_info("%d", request_headers_.size());
logger_->log_error("curl_easy_perform() failed %s on %s, error code %d\n", curl_easy_strerror(res_), url_, res_);
return false;
}
logger_->log_debug("Finished with %s", url_);
return true;
}
int64_t HTTPClient::getResponseCode() const {
return response_data_.response_code;
}
const char *HTTPClient::getContentType() {
return response_data_.response_content_type;
}
const std::vector<char> &HTTPClient::getResponseBody() {
if (response_data_.response_body.empty()) {
if (read_callback_) {
response_data_.response_body = read_callback_->to_string();
} else {
response_data_.response_body = content_.to_string();
}
}
return response_data_.response_body;
}
void HTTPClient::set_request_method(std::string method) {
ranges::actions::transform(method, [](auto ch) { return ::toupper(static_cast<unsigned char>(ch)); });
if (method_ == method)
return;
method_ = std::move(method);
if (method_ == "POST") {
curl_easy_setopt(http_session_.get(), CURLOPT_POST, 1L);
curl_easy_setopt(http_session_.get(), CURLOPT_CUSTOMREQUEST, nullptr);
} else if (method_ == "HEAD") {
curl_easy_setopt(http_session_.get(), CURLOPT_NOBODY, 1L);
curl_easy_setopt(http_session_.get(), CURLOPT_CUSTOMREQUEST, nullptr);
} else if (method_ == "GET") {
curl_easy_setopt(http_session_.get(), CURLOPT_HTTPGET, 1L);
curl_easy_setopt(http_session_.get(), CURLOPT_CUSTOMREQUEST, nullptr);
} else if (method_ == "PUT") {
curl_easy_setopt(http_session_.get(), CURLOPT_UPLOAD, 1L);
curl_easy_setopt(http_session_.get(), CURLOPT_CUSTOMREQUEST, nullptr);
} else {
curl_easy_setopt(http_session_.get(), CURLOPT_POST, 0L);
curl_easy_setopt(http_session_.get(), CURLOPT_NOBODY, 0L);
curl_easy_setopt(http_session_.get(), CURLOPT_HTTPGET, 0L);
curl_easy_setopt(http_session_.get(), CURLOPT_UPLOAD, 0L);
curl_easy_setopt(http_session_.get(), CURLOPT_CUSTOMREQUEST, method_.c_str());
}
}
int HTTPClient::onProgress(void *clientp, curl_off_t /*dltotal*/, curl_off_t dlnow, curl_off_t /*ultotal*/, curl_off_t ulnow) {
HTTPClient& client = *reinterpret_cast<HTTPClient*>(clientp);
auto now = std::chrono::steady_clock::now();
auto elapsed = now - client.progress_.last_transferred_;
if (dlnow != client.progress_.downloaded_data_ || ulnow != client.progress_.uploaded_data_) {
// did transfer data
client.progress_.last_transferred_ = now;
client.progress_.downloaded_data_ = dlnow;
client.progress_.uploaded_data_ = ulnow;
return 0;
}
// did not transfer data
if (elapsed > client.read_timeout_) {
// timeout
client.logger_->log_error("HTTP operation has been idle for %" PRId64 " ms, limit (%" PRId64 "ms) reached, terminating connection\n",
int64_t{std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count()},
int64_t{client.read_timeout_.count()});
return 1;
}
return 0;
}
void HTTPClient::configure_secure_connection() {
#ifdef OPENSSL_SUPPORT
if (ssl_context_service_) {
logger_->log_debug("Using certificate file \"%s\"", ssl_context_service_->getCertificateFile().string());
logger_->log_debug("Using private key file \"%s\"", ssl_context_service_->getPrivateKeyFile().string());
logger_->log_debug("Using CA certificate file \"%s\"", ssl_context_service_->getCACertificate().string());
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_CTX_FUNCTION, &configure_ssl_context);
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_CTX_DATA, static_cast<void *>(ssl_context_service_.get()));
curl_easy_setopt(http_session_.get(), CURLOPT_CAINFO, nullptr);
curl_easy_setopt(http_session_.get(), CURLOPT_CAPATH, nullptr);
} else {
static const auto default_ca_path = utils::getDefaultCAPath();
if (default_ca_path)
logger_->log_debug("Using CA certificate file \"%s\"", default_ca_path->string());
else
logger_->log_error("Could not find valid CA certificate file");
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_CTX_FUNCTION, nullptr);
curl_easy_setopt(http_session_.get(), CURLOPT_SSL_CTX_DATA, nullptr);
if (default_ca_path)
curl_easy_setopt(http_session_.get(), CURLOPT_CAINFO, default_ca_path->string().c_str());
else
curl_easy_setopt(http_session_.get(), CURLOPT_CAINFO, nullptr);
curl_easy_setopt(http_session_.get(), CURLOPT_CAPATH, nullptr);
}
#endif
}
void HTTPClient::setInterface(const std::string &ifc) {
curl_easy_setopt(http_session_.get(), CURLOPT_INTERFACE, ifc.c_str());
}
void HTTPClient::setFollowRedirects(bool follow) {
curl_easy_setopt(http_session_.get(), CURLOPT_FOLLOWLOCATION, follow);
}
bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
if (field_name.empty()) {
return false;
}
// RFC822 3.1.2: The field-name must be composed of printable ASCII characters
// (i.e., characters that have values between 33. and 126., decimal, except colon).
return ranges::all_of(field_name, [](char c) { return c >= 33 && c <= 126 && c != ':'; });
}
std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string field_name) {
if (field_name.empty()) {
return "X-MiNiFi-Empty-Attribute-Name";
}
std::string result;
result.reserve(field_name.size());
// RFC822 3.1.2: The field-name must be composed of printable ASCII characters
// (i.e., characters that have values between 33. and 126., decimal, except colon).
ranges::actions::transform(field_name, [](char ch) {
return (ch >= 33 && ch <= 126 && ch != ':') ? ch : '-';
});
return field_name;
}
void HTTPClient::CurlEasyCleanup::operator()(CURL* curl) const {
curl_easy_cleanup(curl);
}
void HTTPClient::CurlMimeFree::operator()(curl_mime* curl_mime) const {
curl_mime_free(curl_mime);
}
REGISTER_RESOURCE(HTTPClient, InternalResource);
} // namespace org::apache::nifi::minifi::extensions::curl