in src/databao_context_engine/mcp/mcp_server.py [0:0]
def _create_mcp_server(self, host: str | None = None, port: int | None = None) -> FastMCP:
mcp = FastMCP(host=host or "127.0.0.1", port=port or 8000, lifespan=mcp_server_lifespan)
@mcp.tool(
description="Retrieve the contents of the all_results file",
annotations=ToolAnnotations(readOnlyHint=True, idempotentHint=True, openWorldHint=False),
)
def all_results_tool():
return run_all_results_tool(self._databao_context_engine, self._run_name)
@mcp.tool(
description="Retrieve the context built from various resources, including databases, dbt tools, plain and structured files, to retrieve relevant information",
annotations=ToolAnnotations(readOnlyHint=True, idempotentHint=True, openWorldHint=False),
)
def retrieve_tool(text: str, limit: int | None):
return run_retrieve_tool(
databao_context_engine=self._databao_context_engine,
run_name=self._run_name,
text=text,
limit=limit or 50,
)
return mcp