in src/nova_act/impl/extension.py [0:0]
def dispatch_and_wait_for_prompt_completion(self, act: Act) -> ActResult | ActError:
"""Dispatch a pending act prompt and wait for reply from the extension.
Calls `self._dispatch_prompt_and_wait_for_ack` to (1) add the pending prompt to
`self._dispatched_prompts` and (2) post the message to the window. We then
wait until the result appears in `self._completed_prompts`, which
triggers when the extension posts a completion message to the console and
`self.handle_autonomy_completion_message` adds it to `self._completed_prompts`.
"""
self._playwright_manager.window_message_handler.bind(act)
_LOGGER.debug(f"SDK version: {SDK_VERSION}")
kb_cm: KeyboardEventWatcher | ContextManager[None] = (
KeyboardEventWatcher(chr(24), "ctrl+x", "stop agent act() call without quitting the browser")
if self._tty
else nullcontext()
)
with kb_cm as watcher:
# dispatch request to Extension
retry_call(
self._dispatch_prompt_and_wait_for_ack,
fkwargs={"act": act},
exceptions=ActDispatchError,
delay=DEFAULT_RETRY_DELAY,
tries=DEFAULT_RETRY_TRIES if self._retry else 1,
)
scroller = LoadScroller(condition_check=lambda: len(act.steps) < 1)
end_time = time.time() + act.timeout
scroller = LoadScroller()
num_steps_observed = 0
while time.time() < end_time:
self._poll_playwright(DEFAULT_POLL_SLEEP_S)
if not is_quiet():
scroller.scroll()
if len(act.steps) > num_steps_observed:
for step in act.steps[num_steps_observed:]:
model_response = step.model_output.awl_raw_program
newline = "\n"
formatted_response = (
f"\n{get_session_id_prefix()}{model_response.replace(newline, newline + '>> ')}"
)
_TRACE_LOGGER.info(formatted_response)
num_steps_observed = len(act.steps)
if self._tty:
assert watcher is not None
triggered = watcher.is_triggered()
if triggered:
_TRACE_LOGGER.info(f"\n{get_session_id_prefix()}Terminating agent workflow")
self.cancel_prompt(act)
if act.is_complete:
break
else:
act.did_timeout = True
self.cancel_prompt(act)
if self._run_info_compiler:
self._run_info_compiler.compile(act)
result = act.result
output: ActResult | ActError
if isinstance(result, ActCanceled):
output = ActCanceledError(metadata=act.metadata)
elif isinstance(result, ActFailed):
output = parse_errors(act, self._backend_info)
elif isinstance(result, ActSucceeded):
output = ActResult(
response=result.response,
metadata=act.metadata,
)
else:
output = ActClientError(message="Unhandled act result", metadata=act.metadata)
return output