in prod/native/libcommon/code/transport/HttpTransportAsync.h [152:202]
void send(std::unique_lock<std::mutex> &lockedPayloadsMutex) {
while (!payloadsToSend_.empty()) {
auto [endpointHash, payload] = std::move(payloadsToSend_.front());
payloadsToSend_.pop();
payloadsByteUsage_ -= payload.size();
ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X payload size: %zu", endpointHash, payload.size());
lockedPayloadsMutex.unlock();
try {
auto [endpointUrl, headers, connId, conn, maxRetries, retryDelay] = endpoints_.getConnection(endpointHash);
try {
std::size_t retry = 0;
while (retry < maxRetries) {
auto responseCode = conn.sendPayload(endpointUrl, headers, payload);
ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu responseCode %d", endpointHash, connId, payload.size(), static_cast<int>(responseCode));
if (responseCode >= 200 && responseCode < 300) {
break;
}
if (responseCode >= 400 && responseCode < 500 && responseCode != 408 && responseCode != 429) {
std::string msg = "server returned with code "s;
msg.append(std::to_string(responseCode));
throw std::runtime_error(msg);
}
retry++;
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu retry %zu/%zu delay: %zu responseCode %d ", endpointHash, connId, payload.size(), retry, maxRetries, retryDelay.count(), static_cast<int>(responseCode));
std::this_thread::sleep_for(retryDelay);
}
ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu", endpointHash, connId, payload.size());
} catch (std::runtime_error const &e) {
ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send exception '%s'. enpointHash: %X connectionId: %X payload size: %zu", e.what(), endpointHash, connId, payload.size());
}
} catch (std::runtime_error const &error) {
ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send %s", error.what());
}
lockedPayloadsMutex.lock();
// it will break sending and emit log if class destructor was triggered, payloads queue is not empty and timeout was set and reached
if (forceFlushOnDestruction_ && !payloadsToSend_.empty() && config_->get().async_transport_shutdown_timeout.count() > 0 && ((std::chrono::steady_clock::now() - shutdownStart_) >= config_->get().async_transport_shutdown_timeout)) {
ELOGF_WARNING(log_, TRANSPORT, "Dropping %zu payloads because ELASTIC_OTEL_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT (%zums) was reached", payloadsToSend_.size(), config_->get().async_transport_shutdown_timeout.count());
break;
}
}
}