prod/native/libcommon/code/transport/HttpTransportAsync.h (169 lines of code) (raw):
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include "ForkableInterface.h"
#include "ConfigurationStorage.h"
#include "CurlSender.h"
#include "HttpEndpoints.h"
#include "SpinLock.h"
#include "CommonUtils.h"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <queue>
#include <span>
#include <string>
#include <string_view>
#include <thread>
#include <vector>
#include <boost/container_hash/hash.hpp>
#include <curl/curl.h>
using namespace std::literals;
namespace elasticapm::php::transport {
template <typename CurlSender = CurlSender, typename Endpoints = HttpEndpoints>
class HttpTransportAsync : public ForkableInterface, public boost::noncopyable {
using endpointUrlHash_t = Endpoints::endpointUrlHash_t;
public:
HttpTransportAsync(std::shared_ptr<LoggerInterface> log, std::shared_ptr<ConfigurationStorage> config) : log_(std::move(log)), config_(std::move(config)), endpoints_(log_) {
CurlInit();
}
~HttpTransportAsync() {
shutdownStart_ = std::chrono::steady_clock::now();
forceFlushOnDestruction_ = true;
shutdownThread();
CurlCleanup();
}
void initializeConnection(std::string endpointUrl, size_t endpointHash, std::string contentType, HttpEndpoint::enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay) {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::initializeConnection endpointUrl '%s' enpointHash: %X timeout: %zums retries: %zu retry delay: %zums", endpointUrl.c_str(), endpointHash, timeout.count(), maxRetries, retryDelay.count());
try {
endpoints_.add(std::move(endpointUrl), endpointHash, config_->get().verify_server_cert, std::move(contentType), endpointHeaders, timeout, maxRetries, retryDelay);
startThread();
} catch (std::exception const &error) {
ELOGF_ERROR(log_, TRANSPORT, "HttpTransportAsync::initializeConnection exception '%s'", error.what());
}
}
void enqueue(size_t endpointHash, std::span<std::byte> payload) {
{
std::lock_guard<std::mutex> lock(mutex_);
ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::enqueue enpointHash: %X payload size: %zu, current queue size %zu usage %zu bytes", endpointHash, payload.size(), payloadsToSend_.size(), payloadsByteUsage_);
if (payloadsByteUsage_ + payload.size() > config_->get().max_send_queue_size) {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::enqueue payloadsByteUsageLimit %zu reached. Payload will be dropped. enpointHash: %X payload size: %zu, current queue size %zu usage %zu bytes", config_->get().max_send_queue_size, endpointHash, payload.size(), payloadsToSend_.size(), payloadsByteUsage_);
return;
}
payloadsToSend_.emplace(endpointHash, std::vector<std::byte>(payload.begin(), payload.end()));
payloadsByteUsage_ += payload.size();
}
pauseCondition_.notify_all();
}
void prefork() final {
shutdownThread();
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::prefork payloads queue size %zu", payloadsToSend_.size());
CurlCleanup();
}
void postfork([[maybe_unused]] bool child) final {
CurlInit();
if (child && !payloadsToSend_.empty()) {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::postfork child emptying payloads queue. %zu will be sent from parent", payloadsToSend_.size());
decltype(payloadsToSend_) q;
payloadsToSend_.swap(q);
}
working_ = true;
startThread();
pauseCondition_.notify_all();
}
protected:
void startThread() {
std::lock_guard<std::mutex> lock(mutex_);
if (!thread_) {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync startThread");
thread_ = std::make_unique<std::thread>([this]() { asyncSender(); });
}
}
void shutdownThread() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (thread_) {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync shutdownThread");
}
working_ = false;
}
pauseCondition_.notify_all();
if (thread_ && thread_->joinable()) {
thread_->join();
}
thread_.reset();
}
void asyncSender() {
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::asyncSender blocking signals and starting work");
elasticapm::utils::blockApacheAndPHPSignals();
std::unique_lock<std::mutex> lock(mutex_);
while (working_) {
pauseCondition_.wait(lock, [this]() -> bool { return !payloadsToSend_.empty() || !working_; });
if (!working_ && !forceFlushOnDestruction_) {
break;
}
send(lock);
}
}
void send(std::unique_lock<std::mutex> &lockedPayloadsMutex) {
while (!payloadsToSend_.empty()) {
auto [endpointHash, payload] = std::move(payloadsToSend_.front());
payloadsToSend_.pop();
payloadsByteUsage_ -= payload.size();
ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X payload size: %zu", endpointHash, payload.size());
lockedPayloadsMutex.unlock();
try {
auto [endpointUrl, headers, connId, conn, maxRetries, retryDelay] = endpoints_.getConnection(endpointHash);
try {
std::size_t retry = 0;
while (retry < maxRetries) {
auto responseCode = conn.sendPayload(endpointUrl, headers, payload);
ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu responseCode %d", endpointHash, connId, payload.size(), static_cast<int>(responseCode));
if (responseCode >= 200 && responseCode < 300) {
break;
}
if (responseCode >= 400 && responseCode < 500 && responseCode != 408 && responseCode != 429) {
std::string msg = "server returned with code "s;
msg.append(std::to_string(responseCode));
throw std::runtime_error(msg);
}
retry++;
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu retry %zu/%zu delay: %zu responseCode %d ", endpointHash, connId, payload.size(), retry, maxRetries, retryDelay.count(), static_cast<int>(responseCode));
std::this_thread::sleep_for(retryDelay);
}
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu", endpointHash, connId, payload.size());
} catch (std::runtime_error const &e) {
ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send exception '%s'. enpointHash: %X connectionId: %X payload size: %zu", e.what(), endpointHash, connId, payload.size());
}
} catch (std::runtime_error const &error) {
ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send %s", error.what());
}
lockedPayloadsMutex.lock();
// it will break sending and emit log if class destructor was triggered, payloads queue is not empty and timeout was set and reached
if (forceFlushOnDestruction_ && !payloadsToSend_.empty() && config_->get().async_transport_shutdown_timeout.count() > 0 && ((std::chrono::steady_clock::now() - shutdownStart_) >= config_->get().async_transport_shutdown_timeout)) {
ELOGF_WARNING(log_, TRANSPORT, "Dropping %zu payloads because ELASTIC_OTEL_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT (%zums) was reached", payloadsToSend_.size(), config_->get().async_transport_shutdown_timeout.count());
break;
}
}
}
void CurlInit() {
auto curlInitResult = curl_global_init(CURL_GLOBAL_ALL);
if (curlInitResult != CURLE_OK) {
ELOGF_ERROR(log_, TRANSPORT, "HttpTransportAsync curl_global_init failed: %s (%d)", curl_easy_strerror(curlInitResult), (int)curlInitResult);
}
}
void CurlCleanup() {
curl_global_cleanup();
}
protected:
std::shared_ptr<LoggerInterface> log_;
std::shared_ptr<ConfigurationStorage> config_;
Endpoints endpoints_;
std::mutex mutex_;
std::queue<std::pair<endpointUrlHash_t, std::vector<std::byte>>> payloadsToSend_;
std::size_t payloadsByteUsage_ = 0;
std::unique_ptr<std::thread> thread_;
std::condition_variable pauseCondition_;
bool working_ = true;
std::atomic_bool forceFlushOnDestruction_ = false;
std::chrono::time_point<std::chrono::steady_clock> shutdownStart_;
};
} // namespace elasticapm::php::transport