spectator/publisher.cc (112 lines of code) (raw):

#include "publisher.h" #include "logger.h" #include <fmt/format.h> namespace spectator { static const char NEW_LINE = '\n'; SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint, uint32_t bytes_to_buffer, std::shared_ptr<spdlog::logger> logger) : logger_(std::move(logger)), udp_socket_(io_context_), local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) { buffer_.reserve(bytes_to_buffer_ + 1024); if (absl::StartsWith(endpoint, "unix:")) { setup_unix_domain(endpoint.substr(5)); } else if (absl::StartsWith(endpoint, "udp:")) { auto pos = 4; // if the user used udp://foo:1234 instead of udp:foo:1234 // adjust accordingly if (endpoint.substr(pos, 2) == "//") { pos += 2; } setup_udp(endpoint.substr(pos)); } else if (endpoint != "disabled") { logger_->warn( "Unknown endpoint: '{}'. Expecting: 'unix:/path/to/socket'" " or 'udp:hostname:port' - Will not send metrics", std::string(endpoint)); setup_nop_sender(); } } void SpectatordPublisher::setup_nop_sender() { sender_ = [this](std::string_view msg) { logger_->trace("{}", msg); }; } void SpectatordPublisher::local_reconnect(absl::string_view path) { using endpoint_t = asio::local::datagram_protocol::endpoint; try { if (local_socket_.is_open()) { local_socket_.close(); } local_socket_.open(); local_socket_.connect(endpoint_t(std::string(path))); } catch (std::exception& e) { logger_->warn("Unable to connect to {}: {}", std::string(path), e.what()); } } void SpectatordPublisher::setup_unix_domain(absl::string_view path) { local_reconnect(path); // get a copy of the file path std::string local_path{path}; sender_ = [local_path, this](std::string_view msg) { buffer_.append(msg); if (buffer_.length() >= bytes_to_buffer_) { for (auto i = 0; i < 3; ++i) { try { auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); break; } catch (std::exception& e) { local_reconnect(local_path); logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i, e.what()); } } buffer_.clear(); } else { buffer_.push_back(NEW_LINE); } }; } inline asio::ip::udp::endpoint resolve_host_port( asio::io_context& io_context, // NOLINT absl::string_view host_port) { using asio::ip::udp; udp::resolver resolver{io_context}; auto end_host = host_port.find(':'); if (end_host == std::string_view::npos) { auto err = fmt::format( "Unable to parse udp endpoint: '{}'. Expecting hostname:port", std::string(host_port)); throw std::runtime_error(err); } auto host = host_port.substr(0, end_host); auto port = host_port.substr(end_host + 1); return *resolver.resolve(udp::v6(), std::string(host), std::string(port)); } void SpectatordPublisher::udp_reconnect( const asio::ip::udp::endpoint& endpoint) { try { if (udp_socket_.is_open()) { udp_socket_.close(); } udp_socket_.open(asio::ip::udp::v6()); udp_socket_.connect(endpoint); } catch (std::exception& e) { logger_->warn("Unable to connect to {}: {}", endpoint.address().to_string(), endpoint.port()); } } void SpectatordPublisher::setup_udp(absl::string_view host_port) { auto endpoint = resolve_host_port(io_context_, host_port); udp_reconnect(endpoint); sender_ = [endpoint, this](std::string_view msg) { for (auto i = 0; i < 3; ++i) { try { udp_socket_.send(asio::buffer(msg)); logger_->trace("Sent (udp): {}", msg); break; } catch (std::exception& e) { logger_->warn("Unable to send {} - attempt {}/3", msg, i); udp_reconnect(endpoint); } } }; } } // namespace spectator