tablestore-java-mcp-server-rag/knowledge-data-generator/knowledge_manager.py (146 lines of code) (raw):

import json from mcp import ClientSession from mcp.client.sse import sse_client from typing import Any, List from contextlib import AsyncExitStack import asyncio import chunk from config import * import sys class MCPClient: """ A client class for interacting with the MCP (Model Control Protocol) server. This class manages the connection and communication with the SQLite database through MCP. """ def __init__(self, host: str): """Initialize the MCP client with server parameters""" self.host = host self.exit_stack = AsyncExitStack() self.session = None self._client = None async def __aenter__(self): """Async context manager entry""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" if self.session: await self.session.__aexit__(exc_type, exc_val, exc_tb) if self._client: await self._client.__aexit__(exc_type, exc_val, exc_tb) async def connect(self): """Establishes connection to MCP server""" self._client = sse_client(self.host, timeout=10) stdio_transport = await self.exit_stack.enter_async_context(self._client) read, write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(read, write)) async def get_available_tools(self) -> List[Any]: """ Retrieve a list of available tools from the MCP server. """ if not self.session: raise RuntimeError("Not connected to MCP server") tools = await self.session.list_tools() return tools.tools def call_tool(self, tool_name: str, args) -> Any: """ Create a callable function for a specific tool. This allows us to execute database operations through the MCP server. Args: tool_name: The name of the tool to create a callable for Returns: A callable async function that executes the specified tool """ if not self.session: raise RuntimeError("Not connected to MCP server") return self.session.call_tool(tool_name, args) async def agent_loop(mcp_client, query: str, tools, messages: List[dict] = None): """ Main interaction loop that processes user queries using the LLM and available tools. This function: 1. Sends the user query to the LLM with context about available tools 2. Processes the LLM's response, including any tool calls 3. Returns the final response to the user Args: query: User's input question or command tools: Dictionary of available database tools and their schemas messages: List of messages to pass to the LLM, defaults to None """ available_tools = [{ "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in tools] messages = [ { "role": "user", "content": query } ] # Query LLM with the system prompt, user query, and available tools response = await llm_client.chat.completions.create( model=LLM_MODEL, messages=messages, tools=available_tools ) final_text = [] message = response.choices[0].message final_text.append(message.content or "") while message.tool_calls: for tool_call in message.tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) result = await mcp_client.call_tool(tool_name, tool_args) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") messages.append({ "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_name, "arguments": json.dumps(tool_args) } } ] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(result.content) }) response = await llm_client.chat.completions.create( model=LLM_MODEL, messages=messages, tools=available_tools ) message = response.choices[0].message if message.content: final_text.append(message.content) return "\n".join(final_text) async def import_knowledge(path): knowledge_text = open(path, 'r').read() chunks = chunk.to_chunks(knowledge_text, 2000) async with MCPClient(MCP_SERVER_HOST) as mcp_client: # Get available database tools and prepare them for the LLM tools = await mcp_client.get_available_tools() print('Total Chunks: %d' % len(chunks)) i = 0 for c in chunks: print('Processing chunk %d.' % i) i = i + 1 query = analysis_content_prompt_template % c response = await agent_loop(mcp_client, query, tools) try: j = json.loads(response) except Exception as e: continue for kc in j['Chunks']: q = store_knowledge_prompt_template % kc response = await agent_loop(mcp_client, q, tools) print(response) for faq in j['FAQs']: q = store_faq_prompt_template % (faq['Question'], faq['Answer']) response = await agent_loop(mcp_client, q, tools) print(response) async def search_knowledge(query): query = search_prompt_template % query async with MCPClient(MCP_SERVER_HOST) as mcp_client: # Get available database tools and prepare them for the LLM tools = await mcp_client.get_available_tools() response = await agent_loop(mcp_client, query, tools) print(response) async def chat(query): query = chat_prompt_template % query async with MCPClient(MCP_SERVER_HOST) as mcp_client: # Get available database tools and prepare them for the LLM tools = await mcp_client.get_available_tools() response = await agent_loop(mcp_client, query, tools) print(response) async def main(): # read args from sys.args, return error if args count less than 2 if len(sys.argv) != 3: print("Usage: python knowledge_manager.py import/search/chat <args>") return command = sys.argv[1] args = sys.argv[2] if command == 'import': await import_knowledge(args) elif command == 'search': await search_knowledge(args) elif command == 'chat': await chat(args) else: print("Usage: python knowledge_manager.py import/search/chat <args>") return if __name__ == "__main__": asyncio.run(main())