in src/vw-serving/src/vw_serving/firehose_producer.py [0:0]
def send_records(self, records, attempt=0):
"""Send records to the Firehose stream.
Falied records are sent again with an exponential backoff decay.
Parameters
----------
records : array
Array of formated records to send.
attempt: int
Number of times the records have been sent without success.
"""
# If we already tried more times than we wanted, save to a file
if attempt > self.max_retries:
logger.warning('Writing {} records to file'.format(len(records)))
return
# Sleep before retrying
if attempt:
time.sleep(2 ** attempt * .1)
response = self.firehose_client.put_record_batch(DeliveryStreamName=self.stream_name,
Records=records)
failed_record_count = response['FailedPutCount']
# Grab failed records
if failed_record_count:
logger.warning('Retrying failed records')
failed_records = []
for i, record in enumerate(response['RequestResponses']):
if record.get('ErrorCode'):
failed_records.append(records[i])
# Recursive call
attempt += 1
self.send_records(failed_records, attempt=attempt)