in scripts/morph_router.py [0:0]
def create_app(args):
"""
Creates and configures a FastAPI application instance for the MorphCloud router.
Args:
args: An object containing configuration parameters for the application.
- max_num_sandboxes (int): The maximum number of concurrent sandboxes allowed.
- api_key (str): The MorphCloud API key to use.
Returns:
FastAPI: A configured FastAPI application instance.
"""
app = FastAPI()
from morphcloud.api import MorphCloudClient
from morphcloud.sandbox import Sandbox
app.state.client = MorphCloudClient(api_key=args.api_key)
app.state.Sandbox = Sandbox
app.state.sandbox_semaphore = asyncio.Semaphore(args.max_num_sandboxes)
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/execute_batch")
async def execute_batch(batch: BatchRequest, request: Request):
semaphore = request.app.state.sandbox_semaphore
client = request.app.state.client
Sandbox = request.app.state.Sandbox
languages = batch.languages
timeout = batch.timeout
request_timeout = batch.request_timeout
asyncio_timeout = batch.timeout + 1
async def run_script(script: str, language: str) -> ScriptResult:
sandbox = None
sandbox_id = "unknown"
async with semaphore:
try:
sandbox = await asyncio.to_thread(
Sandbox.new,
client=client,
ttl_seconds=timeout
)
sandbox_id = getattr(sandbox, 'id', None) or getattr(sandbox._instance, 'id', 'unknown')
execution = await asyncio.wait_for(
asyncio.to_thread(
sandbox.run_code,
script,
language=language,
timeout=timeout * 1000
),
timeout=asyncio_timeout,
)
if hasattr(execution, 'text') and execution.text:
return ScriptResult(text=execution.text, exception_str=None)
elif hasattr(execution, 'stdout') and execution.stdout:
return ScriptResult(text=execution.stdout, exception_str=None)
else:
return ScriptResult(text="", exception_str="No output from execution")
except Exception as e:
return ScriptResult(text=None, exception_str=str(e))
finally:
if sandbox:
try:
await asyncio.to_thread(sandbox.close)
await asyncio.to_thread(sandbox.shutdown)
except Exception:
pass
tasks = [run_script(script, lang) for script, lang in zip(batch.scripts, batch.languages)]
return await asyncio.gather(*tasks)
return app