in lca-ai-stack/source/lambda_functions/call_event_stream_processor/tumbling_window_state/call_state_manager.py [0:0]
def _prune_state(self):
inactivity_timestamp = (
datetime.utcnow() - timedelta(seconds=self._max_inactivity_in_secs)
).isoformat()
call_ids_inactive = {
call_id
for call_id, call_state in self._state.get("StatePerCallId", {}).items()
if call_state.get("UpdatedAt", "") < inactivity_timestamp
}
if call_ids_inactive:
# TODO pylint: disable=fixme
# may want to set the state to ENDED or FAILED
LOGGER.warning("inactive call_ids: %s", call_ids_inactive)
call_ids_ended = {
call_id
for call_id, call_state in self._state.get("StatePerCallId", {}).items()
# only prune if the call wasn't updated in this batch to allow late changes to arrive
if call_state.get("Status", "") == "ENDED" and call_id not in self._changed_call_ids
}
call_ids_to_delete = call_ids_inactive.union(call_ids_ended)
if call_ids_to_delete:
LOGGER.debug(
"call ids to delete from state",
extra=dict(call_ids_to_delete=list(call_ids_to_delete)),
)
for call_id in call_ids_to_delete:
self._state["StatePerCallId"].pop(call_id)