def __init__()

in src/vw-serving/src/vw_serving/firehose_producer.py [0:0]


    def __init__(self, stream_name, batch_size=50,
                 batch_time=.2, max_retries=5, threads=2,
                 firehose_client=None):
        self.stream_name = stream_name
        self.buffer_on = os.getenv(environment.FIREHOSE_BUFFER_ON, 'false').lower() == 'true'

        self.max_retries = max_retries
        if firehose_client is None:
            firehose_client = boto3.client('firehose')
        self.firehose_client = firehose_client
        self.pool = ThreadPoolExecutor(threads)

        if self.buffer_on:
            self.queue = Queue()
            self.batch_size = batch_size
            self.batch_time = batch_time
            self.last_flush = time.time()
            self.monitor_running = threading.Event()
            self.monitor_running.set()
            self.pool.submit(self.monitor)
            logger.info(f"Buffering data with batch_size {self.batch_size} and batch_time {self.batch_time}s before push to Firehose")
        else:
            logger.info("Write data directly to Firehose without batching")

        atexit.register(self.close)