maga_transformer/aios/kmonitor/python_client/flume/pyflume.py (56 lines of code) (raw):

import logging, traceback from maga_transformer.aios.kmonitor.python_client.flume import ThriftSourceProtocol from thrift.transport import TTransport, TSocket from thrift.protocol import TCompactProtocol logger = logging.getLogger('flume') class _Transport(object): def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None): self.thrift_host = thrift_host self.thrift_port = thrift_port self.timeout = timeout self.unix_socket = unix_socket self._socket = TSocket.TSocket(self.thrift_host, self.thrift_port, self.unix_socket) self._transport_factory = TTransport.TFramedTransportFactory() self._transport = self._transport_factory.getTransport(self._socket) def connect(self): try: if self.timeout: self._socket.setTimeout(self.timeout) if not self.is_open(): self._transport = self._transport_factory.getTransport(self._socket) self._transport.open() except Exception as e: logger.warn('connect to flume exception:%s', e) logger.warn(traceback.format_exc()) self.close() def reconnect(self): self.close() self.connect() def is_open(self): return self._transport.isOpen() def get_transport(self): return self._transport def close(self): self._transport.close() class FlumeClient(object): def __init__(self, thrift_host, thrift_port, timeout=10000, unix_socket=None): self._transObj = _Transport(thrift_host, thrift_port, timeout=timeout, unix_socket=unix_socket) self._protocol = TCompactProtocol.TCompactProtocol(trans=self._transObj.get_transport()) self.client = ThriftSourceProtocol.Client(iprot=self._protocol, oprot=self._protocol) self._transObj.connect() def send(self, event): try: self.client.send_append(event) except Exception as e: logger.warn('send to flume exception:%s', e) logger.warn(traceback.format_exc()) self._transObj.reconnect() def send_batch(self, events): try: self.client.send_appendBatch(events) except Exception as e: logger.warn('send batch to flume exception:%s', e) logger.warn(traceback.format_exc()) self._transObj.reconnect() def close(self): self._transObj.close()