src/adb_mysql_mcp_server/server.py (194 lines of code) (raw):
import asyncio
import os
import pymysql
from mcp.server import Server
from mcp.types import Resource, ResourceTemplate, Tool, TextContent
from pydantic import AnyUrl
def get_db_config():
config = {
"host": os.getenv("ADB_MYSQL_HOST", "localhost"),
"port": int(os.getenv("ADB_MYSQL_PORT", 3306)),
"user": os.getenv("ADB_MYSQL_USER"),
"password": os.getenv("ADB_MYSQL_PASSWORD"),
"database": os.getenv("ADB_MYSQL_DATABASE"),
}
if not all([config["user"], config["password"], config["database"]]):
raise ValueError("Missing required database configuration")
return config
app = Server(
name="adb-mysql-mcp-server",
version="1.0.0"
)
@app.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="adbmysql:///databases",
name="All of the databases",
description="Display all of the databases in Adb MySQL",
mimeType="text/plain"
)
]
@app.list_resource_templates()
async def list_resource_templates() -> list[ResourceTemplate]:
return [
ResourceTemplate(
uriTemplate="adbmysql:///{database}/tables",
name="Database Tables",
description="Get all the tables in a specific database",
mimeType="text/plain"
),
ResourceTemplate(
uriTemplate="adbmysql:///{database}/{table}/ddl",
name="Table DDL",
description="Get the DDL script of a table in a specific database",
mimeType="text/plain"
),
ResourceTemplate(
uriTemplate="adbmysql:///config/{key}/value",
name="Database Config",
description="Get the value for a config key in the cluster",
mimeType="text/plain"
),
]
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
config = get_db_config()
uri_str = str(uri)
if not uri_str.startswith("adbmysql:///"):
raise ValueError(f"Invalid URI: {uri_str}")
conn = pymysql.connect(**config)
conn.autocommit(True)
cursor = conn.cursor()
try:
if uri_str.startswith("adbmysql:///"):
paths = uri_str[12:].split("/")
if paths[0] == "databases":
query = "show databases;"
cursor.execute(query)
databases = cursor.fetchall()
return "\n".join([database[0] for database in databases])
elif len(paths) == 2 and paths[1] == "tables":
database = paths[0]
query = f"show tables from {database};"
cursor.execute(query)
tables = cursor.fetchall()
return "\n".join([table[0] for table in tables])
elif len(paths) == 3 and paths[2] == "ddl":
database = paths[0]
table = paths[1]
query = f"show create table {database}.{table};"
cursor.execute(query)
ddl = cursor.fetchone()
return ddl[1] if ddl and ddl[1] else f"No DDL Found for {database}.{table}"
elif len(paths) == 3 and paths[0] == "config" and paths[2] == "value":
key = paths[1]
query = f"""show adb_config key={key}"""
cursor.execute(query)
value = cursor.fetchone()
return value[1] if value and value[1] else f"No Config Value Found for {key}"
else:
raise ValueError(f"Invalid mcp resource URI format: {uri_str}")
else:
raise ValueError(f"Invalid resource URI format: {uri_str}")
except pymysql.Error as e:
raise RuntimeError(f"Database error: {str(e)}")
finally:
if cursor:
cursor.close()
if conn.open:
conn.close()
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_sql",
description="Execute a SQL query in the Adb MySQL Cluster",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute"
}
},
"required": ["query"]
},
),
Tool(
name="get_query_plan",
description="Get the query plan for a SQL query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to analyze"
}
},
"required": ["query"]
},
),
Tool(
name="get_execution_plan",
description="Get the actual execution plan with runtime statistics for a SQL query",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to analyze"
}
},
"required": ["query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute SQL commands."""
config = get_db_config()
if name == "execute_sql":
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
elif name == "get_query_plan":
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
query = f"EXPLAIN {query}"
elif name == "get_execution_plan":
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
query = f"EXPLAIN ANALYZE {query}"
else:
raise ValueError(f"Unknown tool: {name}")
conn = pymysql.connect(**config)
conn.autocommit(True)
cursor = conn.cursor()
try:
# Execute the query
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]
except Exception as e:
return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
finally:
if cursor:
cursor.close()
if conn.open:
conn.close()
async def main():
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
try:
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
except Exception as e:
raise
if __name__ == "__main__":
asyncio.run(main())