aws_xray_sdk/core/emitters/udp_emitter.py (40 lines of code) (raw):
import logging
import socket
from aws_xray_sdk.core.daemon_config import DaemonConfig
from ..exceptions.exceptions import InvalidDaemonAddressException
log = logging.getLogger(__name__)
PROTOCOL_HEADER = "{\"format\":\"json\",\"version\":1}"
PROTOCOL_DELIMITER = '\n'
DEFAULT_DAEMON_ADDRESS = '127.0.0.1:2000'
class UDPEmitter:
"""
The default emitter the X-Ray recorder uses to send segments/subsegments
to the X-Ray daemon over UDP using a non-blocking socket. If there is an
exception on the actual data transfer between the socket and the daemon,
it logs the exception and continue.
"""
def __init__(self, daemon_address=DEFAULT_DAEMON_ADDRESS):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setblocking(0)
self.set_daemon_address(daemon_address)
def send_entity(self, entity):
"""
Serializes a segment/subsegment and sends it to the X-Ray daemon
over UDP. By default it doesn't retry on failures.
:param entity: a trace entity to send to the X-Ray daemon
"""
try:
message = "%s%s%s" % (PROTOCOL_HEADER,
PROTOCOL_DELIMITER,
entity.serialize())
log.debug("sending: %s to %s:%s." % (message, self._ip, self._port))
self._send_data(message)
except Exception:
log.exception("Failed to send entity to Daemon.")
def set_daemon_address(self, address):
"""
Set up UDP ip and port from the raw daemon address
string using ``DaemonConfig`` class utlities.
"""
if address:
daemon_config = DaemonConfig(address)
self._ip, self._port = daemon_config.udp_ip, daemon_config.udp_port
@property
def ip(self):
return self._ip
@property
def port(self):
return self._port
def _send_data(self, data):
self._socket.sendto(data.encode('utf-8'), (self._ip, self._port))
def _parse_address(self, daemon_address):
try:
val = daemon_address.split(':')
return val[0], int(val[1])
except Exception:
raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % daemon_address)