def mainloop()

in kinesisresponder/kinesis_responder.py [0:0]


    def mainloop(self):
        """
        Main loop for processing the stream
        :return:
        """
        from pprint import pformat
        from .sentry import inform_sentry_exception
        sleep_delay = 1

        logger.info("Starting up responder thread for shard {0}".format(self.shard_id))
        iterator = self.new_shard_iterator()
        logger.debug("shard iterator is {0}".format(iterator))
        while iterator is not None:
            try:
                record = self._conn.get_records(iterator,limit=10)
                if sleep_delay>1:
                    sleep_delay /= 2
            except kinesis.exceptions.ExpiredIteratorException as e:
                logger.warning("Received expired iterator exception, getting new iterator: {0}".format(str(e)))
                iterator = self.new_shard_iterator()
                continue
            except kinesis.exceptions.ProvisionedThroughputExceededException:
                sleep(sleep_delay)
                sleep_delay*=2
                continue
            except boto.exception.JSONResponseError as e:
                if e.error_code=='ExpiredTokenException':
                    logger.warning("Access credentials expired, refreshing...")
                    self.refresh_access_credentials()
                continue

            time_lag = timedelta(seconds=record['MillisBehindLatest']/1000)
            logger.debug("Time lag to this record set is {0}".format(time_lag))
            logger.debug("Record set is dated {0}".format(datetime.now() - time_lag))

            logger.debug(pformat(record))
            for rec in record['Records']:
                dbrec = KinesisTracker()
                dbrec.stream_name = self.stream_name
                dbrec.shard_id = self.shard_id
                dbrec.created = datetime.now()
                dbrec.updated = datetime.now()
                dbrec.sequence_number = rec['SequenceNumber']
                dbrec.status = KinesisTracker.ST_SEEN
                dbrec.processing_host = "myhost"
                dbrec.millis_behind_latest = record['MillisBehindLatest']
                if self.should_save:
                    dbrec.save()

                dbrec.status = KinesisTracker.ST_PROCESSING
                if self.should_save:
                    dbrec.save()
                try:
                    self.process(rec['Data'], datetime.fromtimestamp(rec['ApproximateArrivalTimestamp']))
                    dbrec.status = KinesisTracker.ST_DONE
                    if self.should_save:
                        dbrec.save()
                except Exception as e:
                    logger.error(traceback.format_exc())
                    inform_sentry_exception(extra_ctx={
                        "record": dbrec.__dict__
                    })

                    dbrec.status = KinesisTracker.ST_ERROR
                    dbrec.last_exception = str(e)
                    dbrec.exception_trace = traceback.format_exc()
                    if self.should_save:
                        dbrec.save()

            iterator = record['NextShardIterator']
            if len(record['Records'])==0 and record['MillisBehindLatest']==0:
                sleep(10)
        logger.info("Ran out of shard records to read")