in source/idea/idea-sdk/src/ideasdk/dynamodb/dynamodb_stream_subscription.py [0:0]
def shard_processor(self):
"""
iterate over all shards periodically, and read all records from the latest shard iterator.
the anticipated volume for configuration updates will be low to very low.
since each application needs to read all configuration updates, all shards are read sequentially.
:return:
"""
while not self._exit.is_set():
try:
with self._shard_iterator_lock:
# randomize polling for shards so to avoid polling limit conflicts across servers
shard_ids = list(self.shard_iterators_map.keys())
random.shuffle(shard_ids)
for shard_id in shard_ids:
shard_iterator = self.shard_iterators_map[shard_id]
# shard is closed. skip
if shard_iterator is None:
continue
next_shard_iterator = shard_iterator
while True:
try:
get_records_result = self.kinesis_client.get_records(ShardIterator=next_shard_iterator, Limit=1000)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
time.sleep(1)
continue
elif e.response['Error']['Code'] in ('ExpiredIteratorException', 'TrimmedDataAccessException'):
next_shard_iterator = None
break
else:
raise e
# when the shard is closed, next shard iterator will be None
next_shard_iterator = get_records_result.get('NextShardIterator')
# records can be an empty set, even when NextShardIterator is not None as the shard is not closed yet.
records = get_records_result.get('Records', [])
if len(records) > 0:
self.log_info(f'{shard_id} - got {len(records)} records', logger=self.logger)
for record in records:
try:
record_data = json.loads(record['Data'])
event_name = record_data['eventName']
if event_name == 'INSERT':
config_entry_raw = record_data['dynamodb']['NewImage']
config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in config_entry_raw.items()}
self.stream_subscriber.on_create(config_entry)
elif event_name == 'MODIFY':
old_config_entry_raw = record_data['dynamodb']['OldImage']
old_config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in old_config_entry_raw.items()}
new_config_entry_raw = record_data['dynamodb']['NewImage']
new_config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in new_config_entry_raw.items()}
self.stream_subscriber.on_update(old_config_entry, new_config_entry)
elif event_name == 'REMOVE':
config_entry_raw = record_data['dynamodb']['OldImage']
config_entry = {k: self.ddb_type_deserializer.deserialize(v) for k, v in config_entry_raw.items()}
self.stream_subscriber.on_delete(config_entry)
except Exception as e:
self.log_exception(f'failed to process {self.table_name} stream update: {e}, record: {record_data}')
if len(records) == 0:
break
else:
time.sleep(1)
self.shard_iterators_map[shard_id] = next_shard_iterator
except Exception as e:
self.log_exception(f'failed to process {self.table_name} update: {e}')
finally:
# since each shard has polling limit of 5 per second, ensure polling intervals are spread out across all applications
self._exit.wait(random.randint(*SHARD_PROCESSOR_INTERVAL))