in lca-ai-stack/source/lambda_functions/call_event_stream_processor/tumbling_window_state/state_manager.py [0:0]
def _get_persisted_state_items_generator(self) -> Generator[Mapping[str, object], None, None]:
# TODO pylint: disable=fixme
# change this to an async generator
# get time delta based on the max window size
max_window_delta_timestamp = (
datetime.utcnow() - timedelta(seconds=self._max_window_in_secs)
).isoformat()
# key expression with a sort key greater than the window size
key_condition_expression = Key(self._ddb_config["pk_name"]).eq(
self._ddb_config["pk_value"]
) & Key(self._ddb_config["sk_name"]).gt(max_window_delta_timestamp)
query_args: QueryInputTableTypeDef = dict(
KeyConditionExpression=key_condition_expression,
# scan index in reverse order since we want to return the most
# recent items on top
ScanIndexForward=False,
# use consistent reads to improve chances getting state concurrently being written
ConsistentRead=True,
)
response = self._dynamodb_table.query(**query_args)
LOGGER.debug("tumbling window restore query response", extra=dict(response=response))
for item in response.get("Items", []):
state = item.get(self._ddb_config["state_attr"], "")
if state and isinstance(state, str):
yield json.loads(state)
# state larger than MAX_DYNAMODB_JSON_SIZE are stored zlib compressed
if state and isinstance(state, bytes):
yield json.loads(zlib.decompress(state).decode("utf-8"))
# paginate through responses
while "LastEvaluatedKey" in response:
query_args["ExclusiveStartKey"] = response["LastEvaluatedKey"]
response = self._dynamodb_table.query(**query_args)
LOGGER.debug(
"tumbling window restore paginated query response",
extra=dict(response=response),
)
for item in response.get("Items", []):
state_json = item.get(self._ddb_config["state_attr"], "")
if state_json and isinstance(state_json, str):
yield json.loads(state_json)