def _get_persisted_state_items_generator()

in lca-ai-stack/source/lambda_functions/call_event_stream_processor/tumbling_window_state/state_manager.py [0:0]


    def _get_persisted_state_items_generator(self) -> Generator[Mapping[str, object], None, None]:
        # TODO  pylint: disable=fixme
        # change this to an async generator

        # get time delta based on the max window size
        max_window_delta_timestamp = (
            datetime.utcnow() - timedelta(seconds=self._max_window_in_secs)
        ).isoformat()
        # key expression with a sort key greater than the window size
        key_condition_expression = Key(self._ddb_config["pk_name"]).eq(
            self._ddb_config["pk_value"]
        ) & Key(self._ddb_config["sk_name"]).gt(max_window_delta_timestamp)
        query_args: QueryInputTableTypeDef = dict(
            KeyConditionExpression=key_condition_expression,
            # scan index in reverse order since we want to return the most
            # recent items on top
            ScanIndexForward=False,
            # use consistent reads to improve chances getting state concurrently being written
            ConsistentRead=True,
        )
        response = self._dynamodb_table.query(**query_args)
        LOGGER.debug("tumbling window restore query response", extra=dict(response=response))
        for item in response.get("Items", []):
            state = item.get(self._ddb_config["state_attr"], "")
            if state and isinstance(state, str):
                yield json.loads(state)
            # state larger than MAX_DYNAMODB_JSON_SIZE are stored zlib compressed
            if state and isinstance(state, bytes):
                yield json.loads(zlib.decompress(state).decode("utf-8"))

        # paginate through responses
        while "LastEvaluatedKey" in response:
            query_args["ExclusiveStartKey"] = response["LastEvaluatedKey"]
            response = self._dynamodb_table.query(**query_args)

            LOGGER.debug(
                "tumbling window restore paginated query response",
                extra=dict(response=response),
            )
            for item in response.get("Items", []):
                state_json = item.get(self._ddb_config["state_attr"], "")
                if state_json and isinstance(state_json, str):
                    yield json.loads(state_json)