src/lindorm_mcp_server/server.py (102 lines of code) (raw):

import argparse import os from contextlib import asynccontextmanager from typing import AsyncIterator from dotenv import load_dotenv from mcp.server.fastmcp import Context, FastMCP from .utils import * from .lindorm_vector_search import LindormVectorSearchClient from .lindorm_wide_table import LindormWideTableClient class LindormContext: def __init__(self, lindorm_search_client: LindormVectorSearchClient, lindorm_sql_client: LindormWideTableClient): self.lindorm_search_client = lindorm_search_client self.lindorm_sql_client = lindorm_sql_client @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[LindormContext]: """Manage application lifecycle for Lindorm""" config = server.config vector_search_client = LindormVectorSearchClient( search_host=config.get("lindorm_search_host"), ai_host=config.get("lindorm_ai_host"), username=config.get("username"), password=config.get("password"), text_embedding_model=config.get("text_embedding_model") ) sql_client = LindormWideTableClient( table_host=config.get("lindorm_table_host"), username=config.get("username"), password=config.get("password"), database=config.get("table_database") ) try: yield LindormContext(vector_search_client, sql_client) finally: pass mcp = FastMCP("Lindorm", lifespan=server_lifespan, log_level="ERROR") @mcp.tool() def lindorm_retrieve_from_index(index_name: str, query: str, content_field: str, vector_field: str, top_k: int = 5, ctx: Context = None) -> str: """ Retrieve from an existing indexes(or knowledgebase) using both full-text search and vector search, and return the aggregated results :param index_name: the index name, or known as knowledgebase name :param query: the query that you want to search in knowledgebase :param content_field: the text field that store the content text. You can get it from the index structure by lindorm_get_index_mappings tool :param vector_field: the vector field that store the vector index. You can get it from the index structure by lindorm_get_index_mappings tool :param top_k: the result number that you want to return :return: the most relevant content stored in the knowledgebase. """ lindorm_search_client = ctx.request_context.lifespan_context.lindorm_search_client contents = lindorm_search_client.rrf_search(index_name, query, top_k, content_field, vector_field) output = f"The retrieving results for query {query} in knowledgebase {index_name} is\n" output += "\n".join(f"{i + 1}. {content}" for i, content in enumerate(contents)) return output @mcp.tool() def lindorm_get_index_fields(index_name: str, ctx: Context = None) -> str: """ Get the fields info of the indexes(or knowledgebase), especially get the vector stored field and content stored field. :param index_name: the index name, or known as knowledgebase name :return: the index fields information """ lindorm_search_client = ctx.request_context.lifespan_context.lindorm_search_client mapping = lindorm_search_client.get_index_mappings(index_name) fields_info = simplify_mappings(mapping, index_name) output = f"The structure(mapping) of index {index_name} is\n" output += json.dumps(fields_info, indent=2, ensure_ascii=False) return output @mcp.tool() def lindorm_list_all_index(ctx: Context = None) -> str: """ List all the indexes(or knowledgebase) you have. :return: all the indexes(or knowledgebase) you have """ lindorm_search_client = ctx.request_context.lifespan_context.lindorm_search_client all_index = lindorm_search_client.list_indexes() output = "All the knowledgebase you have are\n" output += "\n".join(f"{i + 1}. {index}" for i, index in enumerate(all_index)) return output @mcp.tool() def lindorm_execute_sql(query: str, ctx: Context = None) -> str: """ Execute SQL query on Lindorm database. :param query: The SQL query to execute which start with select :return: the results of executing the sql or prompt when meeting certain types of exception """ lindorm_sql_client = ctx.request_context.lifespan_context.lindorm_sql_client res = lindorm_sql_client.execute_query(query) output = f"The results of executing sql {query} is\n" output += res return output @mcp.tool() def lindorm_show_tables(ctx: Context = None) -> str: """ Get all tables in the Lindorm database :return: the tables in the lindorm database """ lindorm_sql_client = ctx.request_context.lifespan_context.lindorm_sql_client return lindorm_sql_client.show_tables() @mcp.tool() def lindorm_describe_table(table_name: str, ctx: Context = None) -> str: """ Get tables schema in the Lindorm database :param table_name: the table name :return: the tables schema """ lindorm_sql_client = ctx.request_context.lifespan_context.lindorm_sql_client return lindorm_sql_client.describe_table(table_name) def parse_arguments(): parser = argparse.ArgumentParser(description="LINDORM MCP Server") parser.add_argument("--lindorm_instance_id", type=str, help="Lindorm Search Host") parser.add_argument("--using_vpc", type=bool, default=False, help="Whether to use the VPC network") parser.add_argument("--username", type=str, default="root", help="Lindorm username") parser.add_argument("--password", type=str, help="Lindorm password") parser.add_argument("--embedding_model", type=str, help="Text Embedding Model Name") parser.add_argument("--database", type=str, default="default", help="The Lindorm Database to execute sql") return parser.parse_args() def main(): load_dotenv() args = parse_arguments() instance_id = os.environ.get("LINDORM_INSTANCE_ID", args.lindorm_instance_id) using_vpc_env = os.environ.get("USING_VPC_NETWORK") if using_vpc_env is not None: using_vpc = str_to_bool(using_vpc_env) else: using_vpc = args.using_vpc mcp.config = { "lindorm_search_host": get_lindorm_search_host(instance_id, using_vpc), "lindorm_ai_host": get_lindorm_ai_host(instance_id, using_vpc), "lindorm_table_host": get_lindorm_table_host(instance_id, using_vpc), "username": os.environ.get("USERNAME", args.username), "password": os.environ.get("PASSWORD", args.password), "text_embedding_model": os.environ.get("TEXT_EMBEDDING_MODEL", args.embedding_model), "table_database": os.environ.get("TABLE_DATABASE", args.database) } mcp.run() if __name__ == "__main__": main()