spectator/writer/udp_writer.py (25 lines of code) (raw):

import socket from ipaddress import ip_address, IPv6Address from typing import Tuple from spectator.writer import Writer class UdpWriter(Writer): """Writer that outputs data to a UDP socket.""" def __init__(self, location: str, address: Tuple[str, int]) -> None: super().__init__() self._logger.debug("initialize UdpWriter to %s", location) self._address = address try: if type(ip_address(self._address[0])) is IPv6Address: self._family = socket.AF_INET6 else: self._family = socket.AF_INET except ValueError: # anything that does not appear to be an IPv4 or IPv6 address (i.e. hostnames) self._family = socket.AF_INET def write(self, line: str) -> None: self._logger.debug("write line=%s", line) try: with socket.socket(self._family, socket.SOCK_DGRAM) as s: s.sendto(bytes(line, encoding="utf-8"), self._address) except IOError: self._logger.error("failed to write line=%s", line) def close(self) -> None: pass