in src/smolagents/remote_executors.py [0:0]
def run_code_raise_errors(self, code_action: str) -> CodeOutput:
try:
# Send execute request
msg_id = self._send_execute_request(code_action)
# Collect output and results
outputs = []
result = None
is_final_answer = False
while True:
msg = json.loads(self.ws.recv())
parent_msg_id = msg.get("parent_header", {}).get("msg_id")
# Skip unrelated messages
if parent_msg_id != msg_id:
continue
msg_type = msg.get("msg_type", "")
msg_content = msg.get("content", {})
if msg_type == "stream":
outputs.append(msg_content["text"])
elif msg_type == "execute_result":
result = msg_content["data"].get("text/plain", None)
elif msg_type == "error":
if msg_content.get("ename", "") == RemotePythonExecutor.FINAL_ANSWER_EXCEPTION:
result = pickle.loads(base64.b64decode(msg_content.get("evalue", "")))
is_final_answer = True
else:
raise AgentError("\n".join(msg_content.get("traceback", [])), self.logger)
elif msg_type == "status" and msg_content["execution_state"] == "idle":
break
return CodeOutput(output=result, logs="".join(outputs), is_final_answer=is_final_answer)
except Exception as e:
self.logger.log_error(f"Code execution failed: {e}")
raise