computer-use-demo/computer_use_demo/tools/computer.py (326 lines of code) (raw):

import asyncio import base64 import os import shlex import shutil from enum import StrEnum from pathlib import Path from typing import Literal, TypedDict, cast, get_args from uuid import uuid4 from anthropic.types.beta import BetaToolComputerUse20241022Param, BetaToolUnionParam from .base import BaseAnthropicTool, ToolError, ToolResult from .run import run OUTPUT_DIR = "/tmp/outputs" TYPING_DELAY_MS = 12 TYPING_GROUP_SIZE = 50 Action_20241022 = Literal[ "key", "type", "mouse_move", "left_click", "left_click_drag", "right_click", "middle_click", "double_click", "screenshot", "cursor_position", ] Action_20250124 = ( Action_20241022 | Literal[ "left_mouse_down", "left_mouse_up", "scroll", "hold_key", "wait", "triple_click", ] ) ScrollDirection = Literal["up", "down", "left", "right"] class Resolution(TypedDict): width: int height: int # sizes above XGA/WXGA are not recommended (see README.md) # scale down to one of these targets if ComputerTool._scaling_enabled is set MAX_SCALING_TARGETS: dict[str, Resolution] = { "XGA": Resolution(width=1024, height=768), # 4:3 "WXGA": Resolution(width=1280, height=800), # 16:10 "FWXGA": Resolution(width=1366, height=768), # ~16:9 } CLICK_BUTTONS = { "left_click": 1, "right_click": 3, "middle_click": 2, "double_click": "--repeat 2 --delay 10 1", "triple_click": "--repeat 3 --delay 10 1", } class ScalingSource(StrEnum): COMPUTER = "computer" API = "api" class ComputerToolOptions(TypedDict): display_height_px: int display_width_px: int display_number: int | None def chunks(s: str, chunk_size: int) -> list[str]: return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)] class BaseComputerTool: """ A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer. The tool parameters are defined by Anthropic and are not editable. """ name: Literal["computer"] = "computer" width: int height: int display_num: int | None _screenshot_delay = 2.0 _scaling_enabled = True @property def options(self) -> ComputerToolOptions: width, height = self.scale_coordinates( ScalingSource.COMPUTER, self.width, self.height ) return { "display_width_px": width, "display_height_px": height, "display_number": self.display_num, } def __init__(self): super().__init__() self.width = int(os.getenv("WIDTH") or 0) self.height = int(os.getenv("HEIGHT") or 0) assert self.width and self.height, "WIDTH, HEIGHT must be set" if (display_num := os.getenv("DISPLAY_NUM")) is not None: self.display_num = int(display_num) self._display_prefix = f"DISPLAY=:{self.display_num} " else: self.display_num = None self._display_prefix = "" self.xdotool = f"{self._display_prefix}xdotool" async def __call__( self, *, action: Action_20241022, text: str | None = None, coordinate: tuple[int, int] | None = None, **kwargs, ): if action in ("mouse_move", "left_click_drag"): if coordinate is None: raise ToolError(f"coordinate is required for {action}") if text is not None: raise ToolError(f"text is not accepted for {action}") x, y = self.validate_and_get_coordinates(coordinate) if action == "mouse_move": command_parts = [self.xdotool, f"mousemove --sync {x} {y}"] return await self.shell(" ".join(command_parts)) elif action == "left_click_drag": command_parts = [ self.xdotool, f"mousedown 1 mousemove --sync {x} {y} mouseup 1", ] return await self.shell(" ".join(command_parts)) if action in ("key", "type"): if text is None: raise ToolError(f"text is required for {action}") if coordinate is not None: raise ToolError(f"coordinate is not accepted for {action}") if not isinstance(text, str): raise ToolError(output=f"{text} must be a string") if action == "key": command_parts = [self.xdotool, f"key -- {text}"] return await self.shell(" ".join(command_parts)) elif action == "type": results: list[ToolResult] = [] for chunk in chunks(text, TYPING_GROUP_SIZE): command_parts = [ self.xdotool, f"type --delay {TYPING_DELAY_MS} -- {shlex.quote(chunk)}", ] results.append( await self.shell(" ".join(command_parts), take_screenshot=False) ) screenshot_base64 = (await self.screenshot()).base64_image return ToolResult( output="".join(result.output or "" for result in results), error="".join(result.error or "" for result in results), base64_image=screenshot_base64, ) if action in ( "left_click", "right_click", "double_click", "middle_click", "screenshot", "cursor_position", ): if text is not None: raise ToolError(f"text is not accepted for {action}") if coordinate is not None: raise ToolError(f"coordinate is not accepted for {action}") if action == "screenshot": return await self.screenshot() elif action == "cursor_position": command_parts = [self.xdotool, "getmouselocation --shell"] result = await self.shell( " ".join(command_parts), take_screenshot=False, ) output = result.output or "" x, y = self.scale_coordinates( ScalingSource.COMPUTER, int(output.split("X=")[1].split("\n")[0]), int(output.split("Y=")[1].split("\n")[0]), ) return result.replace(output=f"X={x},Y={y}") else: command_parts = [self.xdotool, f"click {CLICK_BUTTONS[action]}"] return await self.shell(" ".join(command_parts)) raise ToolError(f"Invalid action: {action}") def validate_and_get_coordinates(self, coordinate: tuple[int, int] | None = None): if not isinstance(coordinate, list) or len(coordinate) != 2: raise ToolError(f"{coordinate} must be a tuple of length 2") if not all(isinstance(i, int) and i >= 0 for i in coordinate): raise ToolError(f"{coordinate} must be a tuple of non-negative ints") return self.scale_coordinates(ScalingSource.API, coordinate[0], coordinate[1]) async def screenshot(self): """Take a screenshot of the current screen and return the base64 encoded image.""" output_dir = Path(OUTPUT_DIR) output_dir.mkdir(parents=True, exist_ok=True) path = output_dir / f"screenshot_{uuid4().hex}.png" # Try gnome-screenshot first if shutil.which("gnome-screenshot"): screenshot_cmd = f"{self._display_prefix}gnome-screenshot -f {path} -p" else: # Fall back to scrot if gnome-screenshot isn't available screenshot_cmd = f"{self._display_prefix}scrot -p {path}" result = await self.shell(screenshot_cmd, take_screenshot=False) if self._scaling_enabled: x, y = self.scale_coordinates( ScalingSource.COMPUTER, self.width, self.height ) await self.shell( f"convert {path} -resize {x}x{y}! {path}", take_screenshot=False ) if path.exists(): return result.replace( base64_image=base64.b64encode(path.read_bytes()).decode() ) raise ToolError(f"Failed to take screenshot: {result.error}") async def shell(self, command: str, take_screenshot=True) -> ToolResult: """Run a shell command and return the output, error, and optionally a screenshot.""" _, stdout, stderr = await run(command) base64_image = None if take_screenshot: # delay to let things settle before taking a screenshot await asyncio.sleep(self._screenshot_delay) base64_image = (await self.screenshot()).base64_image return ToolResult(output=stdout, error=stderr, base64_image=base64_image) def scale_coordinates(self, source: ScalingSource, x: int, y: int): """Scale coordinates to a target maximum resolution.""" if not self._scaling_enabled: return x, y ratio = self.width / self.height target_dimension = None for dimension in MAX_SCALING_TARGETS.values(): # allow some error in the aspect ratio - not ratios are exactly 16:9 if abs(dimension["width"] / dimension["height"] - ratio) < 0.02: if dimension["width"] < self.width: target_dimension = dimension break if target_dimension is None: return x, y # should be less than 1 x_scaling_factor = target_dimension["width"] / self.width y_scaling_factor = target_dimension["height"] / self.height if source == ScalingSource.API: if x > self.width or y > self.height: raise ToolError(f"Coordinates {x}, {y} are out of bounds") # scale up return round(x / x_scaling_factor), round(y / y_scaling_factor) # scale down return round(x * x_scaling_factor), round(y * y_scaling_factor) class ComputerTool20241022(BaseComputerTool, BaseAnthropicTool): api_type: Literal["computer_20241022"] = "computer_20241022" def to_params(self) -> BetaToolComputerUse20241022Param: return {"name": self.name, "type": self.api_type, **self.options} class ComputerTool20250124(BaseComputerTool, BaseAnthropicTool): api_type: Literal["computer_20250124"] = "computer_20250124" def to_params(self): return cast( BetaToolUnionParam, {"name": self.name, "type": self.api_type, **self.options}, ) async def __call__( self, *, action: Action_20250124, text: str | None = None, coordinate: tuple[int, int] | None = None, scroll_direction: ScrollDirection | None = None, scroll_amount: int | None = None, duration: int | float | None = None, key: str | None = None, **kwargs, ): if action in ("left_mouse_down", "left_mouse_up"): if coordinate is not None: raise ToolError(f"coordinate is not accepted for {action=}.") command_parts = [ self.xdotool, f"{'mousedown' if action == 'left_mouse_down' else 'mouseup'} 1", ] return await self.shell(" ".join(command_parts)) if action == "scroll": if scroll_direction is None or scroll_direction not in get_args( ScrollDirection ): raise ToolError( f"{scroll_direction=} must be 'up', 'down', 'left', or 'right'" ) if not isinstance(scroll_amount, int) or scroll_amount < 0: raise ToolError(f"{scroll_amount=} must be a non-negative int") mouse_move_part = "" if coordinate is not None: x, y = self.validate_and_get_coordinates(coordinate) mouse_move_part = f"mousemove --sync {x} {y}" scroll_button = { "up": 4, "down": 5, "left": 6, "right": 7, }[scroll_direction] command_parts = [self.xdotool, mouse_move_part] if text: command_parts.append(f"keydown {text}") command_parts.append(f"click --repeat {scroll_amount} {scroll_button}") if text: command_parts.append(f"keyup {text}") return await self.shell(" ".join(command_parts)) if action in ("hold_key", "wait"): if duration is None or not isinstance(duration, (int, float)): raise ToolError(f"{duration=} must be a number") if duration < 0: raise ToolError(f"{duration=} must be non-negative") if duration > 100: raise ToolError(f"{duration=} is too long.") if action == "hold_key": if text is None: raise ToolError(f"text is required for {action}") escaped_keys = shlex.quote(text) command_parts = [ self.xdotool, f"keydown {escaped_keys}", f"sleep {duration}", f"keyup {escaped_keys}", ] return await self.shell(" ".join(command_parts)) if action == "wait": await asyncio.sleep(duration) return await self.screenshot() if action in ( "left_click", "right_click", "double_click", "triple_click", "middle_click", ): if text is not None: raise ToolError(f"text is not accepted for {action}") mouse_move_part = "" if coordinate is not None: x, y = self.validate_and_get_coordinates(coordinate) mouse_move_part = f"mousemove --sync {x} {y}" command_parts = [self.xdotool, mouse_move_part] if key: command_parts.append(f"keydown {key}") command_parts.append(f"click {CLICK_BUTTONS[action]}") if key: command_parts.append(f"keyup {key}") return await self.shell(" ".join(command_parts)) return await super().__call__( action=action, text=text, coordinate=coordinate, key=key, **kwargs )