def shard_processor()

in source/idea/idea-sdk/src/ideasdk/dynamodb/dynamodb_stream_subscription.py [0:0]


    def shard_processor(self):
        """
        iterate over all shards periodically, and read all records from the latest shard iterator.
        the anticipated volume for configuration updates will be low to very low.

        since each application needs to read all configuration updates, all shards are read sequentially.
        :return:
        """
        while not self._exit.is_set():
            try:
                with self._shard_iterator_lock:

                    # randomize polling for shards so to avoid polling limit conflicts across servers
                    shard_ids = list(self.shard_iterators_map.keys())
                    random.shuffle(shard_ids)

                    for shard_id in shard_ids:

                        shard_iterator = self.shard_iterators_map[shard_id]

                        # shard is closed. skip
                        if shard_iterator is None:
                            continue

                        next_shard_iterator = shard_iterator
                        while True:
                            try:
                                get_records_result = self.kinesis_client.get_records(ShardIterator=next_shard_iterator, Limit=1000)
                            except botocore.exceptions.ClientError as e:
                                if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                                    time.sleep(1)
                                    continue
                                elif e.response['Error']['Code'] in ('ExpiredIteratorException', 'TrimmedDataAccessException'):
                                    next_shard_iterator = None
                                    break
                                else:
                                    raise e

                            # when the shard is closed, next shard iterator will be None
                            next_shard_iterator = get_records_result.get('NextShardIterator')
                            # records can be an empty set, even when NextShardIterator is not None as the shard is not closed yet.
                            records = get_records_result.get('Records', [])

                            if len(records) > 0:
                                self.log_info(f'{shard_id} - got {len(records)} records', logger=self.logger)

                            for record in records:
                                try:
                                    record_data = json.loads(record['Data'])
                                    event_name = record_data['eventName']
                                    if event_name == 'INSERT':
                                        config_entry_raw = record_data['dynamodb']['NewImage']
                                        config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in config_entry_raw.items()}
                                        self.stream_subscriber.on_create(config_entry)
                                    elif event_name == 'MODIFY':
                                        old_config_entry_raw = record_data['dynamodb']['OldImage']
                                        old_config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in old_config_entry_raw.items()}
                                        new_config_entry_raw = record_data['dynamodb']['NewImage']
                                        new_config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in new_config_entry_raw.items()}
                                        self.stream_subscriber.on_update(old_config_entry, new_config_entry)
                                    elif event_name == 'REMOVE':
                                        config_entry_raw = record_data['dynamodb']['OldImage']
                                        config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in config_entry_raw.items()}
                                        self.stream_subscriber.on_delete(config_entry)
                                except Exception as e:
                                    self.log_exception(f'failed to process {self.table_name} stream update: {e}, record: {record_data}')

                            if len(records) == 0:
                                break
                            else:
                                time.sleep(1)

                        self.shard_iterators_map[shard_id] = next_shard_iterator

            except Exception as e:
                self.log_exception(f'failed to process {self.table_name} update: {e}')
            finally:
                # since each shard has polling limit of 5 per second, ensure polling intervals are spread out across all applications
                self._exit.wait(random.randint(*SHARD_PROCESSOR_INTERVAL))