extensions/http-curl/client/HTTPClient.h (160 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "utils/BaseHTTPClient.h" #ifdef WIN32 #pragma comment(lib, "wldap32.lib" ) #pragma comment(lib, "crypt32.lib" ) #pragma comment(lib, "Ws2_32.lib") #define CURL_STATICLIB #include <curl/curl.h> #else #include <curl/curl.h> #endif #include <curl/easy.h> #include <chrono> #include <limits> #include <map> #include <memory> #include <string> #include <string_view> #include <unordered_map> #include <utility> #include <vector> #include "utils/ByteArrayCallback.h" #include "controllers/SSLContextService.h" #include "core/logging/Logger.h" #include "core/logging/LoggerConfiguration.h" namespace org::apache::nifi::minifi::extensions::curl { struct KeepAliveProbeData { std::chrono::seconds keep_alive_delay; std::chrono::seconds keep_alive_interval; }; struct HTTPResponseData { std::vector<char> response_body; utils::HTTPHeaderResponse header_response; char* response_content_type{nullptr}; int64_t response_code{0}; void clear() { header_response.clear(); response_body.clear(); response_content_type = nullptr; response_code = 0; } }; class HTTPClient : public utils::BaseHTTPClient, public core::Connectable { public: HTTPClient(); HTTPClient(std::string name, const utils::Identifier& uuid); HTTPClient(const HTTPClient&) = delete; HTTPClient& operator=(const HTTPClient&) = delete; explicit HTTPClient(std::string url, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr); ~HTTPClient() override; EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; static int debug_callback(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr); void setVerbose(bool use_stderr) override; void addFormPart(const std::string& content_type, const std::string& name, std::unique_ptr<utils::HTTPUploadCallback> form_callback, const std::optional<std::string>& filename); void forceClose(); void initialize(std::string method, std::string url, std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) override; void setConnectionTimeout(std::chrono::milliseconds timeout) override; void setReadTimeout(std::chrono::milliseconds timeout) override; void setUploadCallback(std::unique_ptr<utils::HTTPUploadCallback> callback) override; void setReadCallback(std::unique_ptr<utils::HTTPReadCallback> callback); utils::HTTPUploadCallback* getUploadCallback() const { return write_callback_.get(); } utils::HTTPReadCallback* getReadCallback() const { return read_callback_.get(); } void setContentType(std::string content_type) override; std::string escape(std::string string_to_escape) override; void setPostFields(const std::string& input) override; void setRequestHeader(std::string key, std::optional<std::string> value) override; bool submit() override; int64_t getResponseCode() const override; const char *getContentType() override; const std::vector<char>& getResponseBody() override; void set_request_method(std::string method) override; std::string& getRequestMethod() { return method_; } void setPeerVerification(bool peer_verification) override; void setHostVerification(bool host_verification) override; void setBasicAuth(const std::string& username, const std::string& password) override; void clearBasicAuth() override; bool setSpecificSSLVersion(utils::SSLVersion specific_version) override; bool setMinimumSSLVersion(utils::SSLVersion minimum_version) override; void setKeepAliveProbe(std::optional<KeepAliveProbeData> probe_data); const std::string& getURL() const { return url_; } const std::vector<std::string>& getResponseHeaders() override { return response_data_.header_response.getHeaderLines(); } const std::map<std::string, std::string>& getResponseHeaderMap() override { return response_data_.header_response.getHeaderMap(); } void setInterface(const std::string &); void setFollowRedirects(bool follow); /** * Locates the header value ignoring case. This is different than returning a mapping * of all parsed headers. * This function acknowledges that header entries should be searched case insensitively. * @param key key to search * @return header value. */ std::string getHeaderValue(const std::string &key) { std::string ret; for (const auto &kv : response_data_.header_response.getHeaderMap()) { if (utils::StringUtils::equalsIgnoreCase(key, kv.first)) { ret = kv.second; break; } } return ret; } /** * Determines if we are connected and operating */ bool isRunning() const override { return true; } void yield() override { } bool isWorkAvailable() override { return true; } void setPostSize(size_t size); void setHTTPProxy(const utils::HTTPProxy &proxy) override; static bool isValidHttpHeaderField(std::string_view field_name); static std::string replaceInvalidCharactersInHttpHeaderFieldName(std::string field_name); private: static int onProgress(void *client, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow); struct Progress{ std::chrono::steady_clock::time_point last_transferred_; curl_off_t uploaded_data_{}; curl_off_t downloaded_data_{}; void reset(){ last_transferred_ = std::chrono::steady_clock::now(); uploaded_data_ = 0; downloaded_data_ = 0; } }; Progress progress_; protected: static CURLcode configure_ssl_context(CURL* /*curl*/, void *ctx, void *param) { #ifdef OPENSSL_SUPPORT auto* ssl_context_service = static_cast<minifi::controllers::SSLContextService*>(param); if (!ssl_context_service->configure_ssl_context(static_cast<SSL_CTX*>(ctx))) { return CURLE_FAILED_INIT; } return CURLE_OK; #else return CURLE_FAILED_INIT; #endif } void configure_secure_connection(); std::chrono::milliseconds getAbsoluteTimeout() const { return 3*read_timeout_; } utils::HTTPReadCallback content_{std::numeric_limits<size_t>::max()}; std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; std::string url_; std::string method_; std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)}; std::chrono::milliseconds read_timeout_{std::chrono::seconds(30)}; HTTPResponseData response_data_; CURLcode res_{CURLE_OK}; std::unordered_map<std::string, std::string> request_headers_; struct CurlEasyCleanup { void operator()(CURL* curl) const; }; struct CurlMimeFree { void operator()(curl_mime* curl_mime) const; }; std::unique_ptr<CURL, CurlEasyCleanup> http_session_; std::unique_ptr<curl_mime, CurlMimeFree> form_; std::unique_ptr<utils::HTTPReadCallback> read_callback_; std::unique_ptr<utils::HTTPUploadCallback> write_callback_; std::unique_ptr<utils::HTTPUploadCallback> form_callback_; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<HTTPClient>::getLogger()}; }; } // namespace org::apache::nifi::minifi::extensions::curl