in src/vw-serving/src/vw_serving/firehose_producer.py [0:0]
def __init__(self, stream_name, batch_size=50,
batch_time=.2, max_retries=5, threads=2,
firehose_client=None):
self.stream_name = stream_name
self.buffer_on = os.getenv(environment.FIREHOSE_BUFFER_ON, 'false').lower() == 'true'
self.max_retries = max_retries
if firehose_client is None:
firehose_client = boto3.client('firehose')
self.firehose_client = firehose_client
self.pool = ThreadPoolExecutor(threads)
if self.buffer_on:
self.queue = Queue()
self.batch_size = batch_size
self.batch_time = batch_time
self.last_flush = time.time()
self.monitor_running = threading.Event()
self.monitor_running.set()
self.pool.submit(self.monitor)
logger.info(f"Buffering data with batch_size {self.batch_size} and batch_time {self.batch_time}s before push to Firehose")
else:
logger.info("Write data directly to Firehose without batching")
atexit.register(self.close)