in src/nova_act/impl/window_messages.py [0:0]
def handle_message(self, encrypted_message: dict | str):
"""Register observations in bound Act, if any"""
try:
_LOGGER.debug("Got message %s", encrypted_message)
if isinstance(encrypted_message, str) and encrypted_message == "ping":
return
if not isinstance(encrypted_message, dict):
raise ValueError("Message must be a dict")
message = self._encrypter.decrypt(encrypted_message)
if message is None:
return
_LOGGER.debug("Decrypted %s", message)
message_type = message.get("type")
if self._page_state is not None:
if message_type == COMPLETION_PROMPT_TYPE:
self._page_state.is_settled = True
if self._act is not None:
if message_type == COMPLETION_PROMPT_TYPE:
response = message.get("response")
if response is None:
raise ValueError("Completion message missing response")
completion_type = response.get("type")
if completion_type == "success":
self._act.complete(response.get("result"))
elif completion_type == "canceled":
self._act.cancel()
elif completion_type == "error":
self._act.fail(response)
if message_type == REQUEST_ACCEPTED_PROMPT_TYPE:
self._act.acknowledged = True
if message_type == STEP_OBSERVATION_PROMPT_TYPE:
self._act.add_step(Step.from_message(message))
except Exception as ex:
_LOGGER.error("Error handling message in dispatcher: %s", ex, exc_info=True)
raise