in src/nova_act/impl/extension.py [0:0]
def _dispatch_prompt_and_wait_for_ack(self, act: Act):
"""Dispatch an act prompt to the extension.
Post a message with `type: autonomy-pending-prompt` within the browser context.
The extension listens for messages of this type to initiate program runs.
See also:
* https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage
"""
pending_action_message = {
"type": DISPATCH_PROMPT_TYPE,
"pendingPrompt": act.prompt,
"apiKey": self._nova_act_api_key,
"uuid": act.id,
"endpointName": act.endpoint_name,
"hostname": self._backend_info.api_uri,
"sessionId": self._session_id,
"useBedrock": True,
}
if act.max_steps:
pending_action_message["maxSteps"] = str(act.max_steps)
if act.model_temperature is not None:
pending_action_message["modelTemperature"] = str(act.model_temperature)
if act.model_top_k is not None:
pending_action_message["modelTopK"] = str(act.model_top_k)
if act.model_seed is not None:
pending_action_message["modelSeed"] = str(act.model_seed)
encrypted_message = self._playwright_manager.encrypter.encrypt(pending_action_message)
try:
self._playwright_manager.main_page.evaluate(POST_MESSAGE_EXPRESSION, encrypted_message)
except PlaywrightError:
if self._verbose_errors:
_LOGGER.error("Encountered PlaywrightError", exc_info=True)
end_time = time.time() + EXTENSION_TIMEOUT_S
while time.time() < end_time:
if act.acknowledged:
return
else:
self._poll_playwright(EXTENSION_POLL_SLEEP_S)
raise ActDispatchError(
message="Failed to receive Acknowledgment after dispatching a prompt to browser",
metadata=act.metadata,
)