void send()

in prod/native/libcommon/code/transport/HttpTransportAsync.h [152:202]


    void send(std::unique_lock<std::mutex> &lockedPayloadsMutex) {
        while (!payloadsToSend_.empty()) {
            auto [endpointHash, payload] = std::move(payloadsToSend_.front());
            payloadsToSend_.pop();
            payloadsByteUsage_ -= payload.size();

            ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X payload size: %zu", endpointHash, payload.size());

            lockedPayloadsMutex.unlock();

            try {
                auto [endpointUrl, headers, connId, conn, maxRetries, retryDelay] = endpoints_.getConnection(endpointHash);
                try {
                    std::size_t retry = 0;

                    while (retry < maxRetries) {
                        auto responseCode = conn.sendPayload(endpointUrl, headers, payload);

                        ELOGF_TRACE(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu responseCode %d", endpointHash, connId, payload.size(), static_cast<int>(responseCode));

                        if (responseCode >= 200 && responseCode < 300) {
                            break;
                        }

                        if (responseCode >= 400 && responseCode < 500 && responseCode != 408 && responseCode != 429) {
                            std::string msg = "server returned with code "s;
                            msg.append(std::to_string(responseCode));
                            throw std::runtime_error(msg);
                        }

                        retry++;
                        ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu retry %zu/%zu delay: %zu responseCode %d ", endpointHash, connId, payload.size(), retry, maxRetries, retryDelay.count(), static_cast<int>(responseCode));
                        std::this_thread::sleep_for(retryDelay);
                    }
                    ELOGF_DEBUG(log_, TRANSPORT, "HttpTransportAsync::send enpointHash: %X connectionId: %X payload size: %zu", endpointHash, connId, payload.size());
                } catch (std::runtime_error const &e) {
                    ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send exception '%s'. enpointHash: %X connectionId: %X payload size: %zu", e.what(), endpointHash, connId, payload.size());
                }
            } catch (std::runtime_error const &error) {
                ELOGF_WARNING(log_, TRANSPORT, "HttpTransportAsync::send %s", error.what());
            }

            lockedPayloadsMutex.lock();

            // it will break sending and emit log if class destructor was triggered, payloads queue is not empty and timeout was set and reached
            if (forceFlushOnDestruction_ && !payloadsToSend_.empty() && config_->get().async_transport_shutdown_timeout.count() > 0 && ((std::chrono::steady_clock::now() - shutdownStart_) >= config_->get().async_transport_shutdown_timeout)) {
                ELOGF_WARNING(log_, TRANSPORT, "Dropping %zu payloads because ELASTIC_OTEL_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT (%zums) was reached", payloadsToSend_.size(), config_->get().async_transport_shutdown_timeout.count());
                break;
            }
        }
    }