agents/utils/tool_util.py (27 lines of code) (raw):
"""Tool execution utility with parallel execution support."""
import asyncio
from typing import Any
async def _execute_single_tool(
call: Any, tool_dict: dict[str, Any]
) -> dict[str, Any]:
"""Execute a single tool and handle errors."""
response = {"type": "tool_result", "tool_use_id": call.id}
try:
# Execute the tool directly
result = await tool_dict[call.name].execute(**call.input)
response["content"] = str(result)
except KeyError:
response["content"] = f"Tool '{call.name}' not found"
response["is_error"] = True
except Exception as e:
response["content"] = f"Error executing tool: {str(e)}"
response["is_error"] = True
return response
async def execute_tools(
tool_calls: list[Any], tool_dict: dict[str, Any], parallel: bool = True
) -> list[dict[str, Any]]:
"""Execute multiple tools sequentially or in parallel."""
if parallel:
return await asyncio.gather(
*[_execute_single_tool(call, tool_dict) for call in tool_calls]
)
else:
return [
await _execute_single_tool(call, tool_dict) for call in tool_calls
]