in kinesisresponder/kinesis_responder.py [0:0]
def mainloop(self):
"""
Main loop for processing the stream
:return:
"""
from pprint import pformat
from .sentry import inform_sentry_exception
sleep_delay = 1
logger.info("Starting up responder thread for shard {0}".format(self.shard_id))
iterator = self.new_shard_iterator()
logger.debug("shard iterator is {0}".format(iterator))
while iterator is not None:
try:
record = self._conn.get_records(iterator,limit=10)
if sleep_delay>1:
sleep_delay /= 2
except kinesis.exceptions.ExpiredIteratorException as e:
logger.warning("Received expired iterator exception, getting new iterator: {0}".format(str(e)))
iterator = self.new_shard_iterator()
continue
except kinesis.exceptions.ProvisionedThroughputExceededException:
sleep(sleep_delay)
sleep_delay*=2
continue
except boto.exception.JSONResponseError as e:
if e.error_code=='ExpiredTokenException':
logger.warning("Access credentials expired, refreshing...")
self.refresh_access_credentials()
continue
time_lag = timedelta(seconds=record['MillisBehindLatest']/1000)
logger.debug("Time lag to this record set is {0}".format(time_lag))
logger.debug("Record set is dated {0}".format(datetime.now() - time_lag))
logger.debug(pformat(record))
for rec in record['Records']:
dbrec = KinesisTracker()
dbrec.stream_name = self.stream_name
dbrec.shard_id = self.shard_id
dbrec.created = datetime.now()
dbrec.updated = datetime.now()
dbrec.sequence_number = rec['SequenceNumber']
dbrec.status = KinesisTracker.ST_SEEN
dbrec.processing_host = "myhost"
dbrec.millis_behind_latest = record['MillisBehindLatest']
if self.should_save:
dbrec.save()
dbrec.status = KinesisTracker.ST_PROCESSING
if self.should_save:
dbrec.save()
try:
self.process(rec['Data'], datetime.fromtimestamp(rec['ApproximateArrivalTimestamp']))
dbrec.status = KinesisTracker.ST_DONE
if self.should_save:
dbrec.save()
except Exception as e:
logger.error(traceback.format_exc())
inform_sentry_exception(extra_ctx={
"record": dbrec.__dict__
})
dbrec.status = KinesisTracker.ST_ERROR
dbrec.last_exception = str(e)
dbrec.exception_trace = traceback.format_exc()
if self.should_save:
dbrec.save()
iterator = record['NextShardIterator']
if len(record['Records'])==0 and record['MillisBehindLatest']==0:
sleep(10)
logger.info("Ran out of shard records to read")