gemini/mcp/build_mcp_server_by_gemini.ipynb (1,241 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "ur8xi4C7S06n" }, "outputs": [], "source": [ "# Copyright 2025 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "JAPoU8Sm5E6e" }, "source": [ "# Use Gemini to develop Model Context Protocol (MCP) Server\n", "\n", "\n", "\n", "<table align=\"left\">\n", "<td style=\"text-align: center\">\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\">\n", " <img width=\"32px\" src=\"https://www.gstatic.com/pantheon/images/bigquery/welcome_page/colab-logo.svg\" alt=\"Google Colaboratory logo\"><br> Open in Colab\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fgenerative-ai%2Fmain%2Fgemini%2Fmcp%2Fbuild_mcp_server_by_gemini.ipynb\">\n", " <img width=\"32px\" src=\"https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN\" alt=\"Google Cloud Colab Enterprise logo\"><br> Open in Colab Enterprise\n", " </a>\n", " </td>\n", " <td style=\"text-align: center\">\n", " <a href=\"https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\">\n", " <img src=\"https://www.gstatic.com/images/branding/gcpiconscolors/vertexai/v1/32px.svg\" alt=\"Vertex AI logo\"><br> Open in Vertex AI Workbench\n", " </a>\n", " </td>\n", " \n", " \n", " <td style=\"text-align: center\">\n", " <a href=\"https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\">\n", " <img width=\"32px\" src=\"https://www.svgrepo.com/download/217753/github.svg\" alt=\"GitHub logo\"><br> View on GitHub\n", " </a>\n", " </td>\n", "</table>\n", "\n", "<div style=\"clear: both;\"></div>\n", "\n", "<b>Share to:</b>\n", "\n", "<a href=\"https://www.linkedin.com/sharing/share-offsite/?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\" target=\"_blank\">\n", " <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/8/81/LinkedIn_icon.svg\" alt=\"LinkedIn logo\">\n", "</a>\n", "\n", "<a href=\"https://bsky.app/intent/compose?text=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\" target=\"_blank\">\n", " <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/7/7a/Bluesky_Logo.svg\" alt=\"Bluesky logo\">\n", "</a>\n", "\n", "<a href=\"https://twitter.com/intent/tweet?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\" target=\"_blank\">\n", " <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/5/5a/X_icon_2.svg\" alt=\"X logo\">\n", "</a>\n", "\n", "<a href=\"https://reddit.com/submit?url=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\" target=\"_blank\">\n", " <img width=\"20px\" src=\"https://redditinc.com/hubfs/Reddit%20Inc/Brand/Reddit_Logo.png\" alt=\"Reddit logo\">\n", "</a>\n", "\n", "<a href=\"https://www.facebook.com/sharer/sharer.php?u=https%3A//github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/mcp/build_mcp_server_by_gemini.ipynb\" target=\"_blank\">\n", " <img width=\"20px\" src=\"https://upload.wikimedia.org/wikipedia/commons/5/51/Facebook_f_logo_%282019%29.svg\" alt=\"Facebook logo\">\n", "</a>" ] }, { "cell_type": "markdown", "metadata": { "id": "84f0f73a0f76" }, "source": [ "| Author |\n", "| --- |\n", "| [Dave Wang](https://github.com/wadave) |" ] }, { "cell_type": "markdown", "metadata": { "id": "tvgnzT1CKxrO" }, "source": [ "## Overview\n", "The Model Context Protocol (MCP) is an open standard that simplifies how AI assistants connect with external data, tools, and systems. It achieves this by standardizing the way applications provide contextual information to Large Language Models (LLMs), creating a vital interface for models to interact directly with various external services.\n", "\n", "Developers building MCP-enabled applications have the flexibility to utilize existing third-party MCP servers or implement their own custom server solutions.\n", "\n", "This notebook focuses on the latter, demonstrating how to build custom MCP servers using Gemini. We will walk through code generation and testing for three specific examples:\n", "\n", "#### MCP server code generation:\n", "- Example 1: Creating a BigQuery MCP Server\n", "- Example 2: Creating a MedlinePlus MCP Server\n", "- Example 3: Creating an NIH MCP Server\n", "\n", "#### MCP server code testing:\n", "- Option 1: Use LangChain MCP Adaptor\n", "- Option 2: Build your own agent" ] }, { "cell_type": "markdown", "metadata": { "id": "61RBz8LLbxCR" }, "source": [ "## Get started" ] }, { "cell_type": "markdown", "metadata": { "id": "No17Cw5hgx12" }, "source": [ "### Install Google Gen AI SDK and other required packages\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "tFy3H3aPgx12" }, "outputs": [], "source": [ "%pip install --upgrade --quiet google-genai google-cloud-secret-manager mcp black google-cloud-bigquery langchain-mcp-adapters langchain langchain-google-vertexai langgraph" ] }, { "cell_type": "markdown", "metadata": { "id": "dmWOrTJ3gx13" }, "source": [ "### Authenticate your notebook environment (Colab only)\n", "\n", "If you're running this notebook on Google Colab, run the cell below to authenticate your environment." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NyKGtVQjgx13" }, "outputs": [], "source": [ "import sys\n", "\n", "if \"google.colab\" in sys.modules:\n", " from google.colab import auth\n", "\n", " auth.authenticate_user()" ] }, { "cell_type": "markdown", "metadata": { "id": "dd2d67b1696a" }, "source": [ "### Import Libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5558cd94d971" }, "outputs": [], "source": [ "import json\n", "import os\n", "import re\n", "import sys\n", "from typing import Any\n", "\n", "from IPython.display import Markdown, display\n", "import black\n", "from google import genai\n", "from google.cloud import aiplatform\n", "from google.genai import types\n", "from google.genai.types import (\n", " GenerateContentConfig,\n", ")\n", "from langchain_core.messages import AIMessage, HumanMessage, ToolMessage\n", "from langchain_google_vertexai import ChatVertexAI\n", "from langchain_mcp_adapters.client import MultiServerMCPClient\n", "from langgraph.prebuilt import create_react_agent\n", "from mcp import ClientSession, StdioServerParameters\n", "from mcp.client.stdio import stdio_client\n", "import requests" ] }, { "cell_type": "markdown", "metadata": { "id": "8cc5e5e6e055" }, "source": [ "### Helper functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "adc9451337a5" }, "outputs": [], "source": [ "def get_url_content(url: str) -> str:\n", " try:\n", " # Send an HTTP GET request to the URL\n", " response = requests.get(url)\n", "\n", " # Raise an exception if the request returned an error status code (like 404 or 500)\n", " response.raise_for_status()\n", "\n", " # Get the content of the response as text (HTML, in this case)\n", " # 'requests' automatically decodes the content based on HTTP headers\n", " file_content = response.text\n", "\n", " # Now you can work with the content\n", " print(\"Successfully fetched content\")\n", " return file_content\n", " # Or, save it to a file:\n", " # with open(\"server_page.html\", \"w\", encoding=\"utf-8\") as f:\n", " # f.write(file_content)\n", " # print(\"Content saved to server_page.html\")\n", "\n", " except requests.exceptions.RequestException as e:\n", " # Handle potential errors during the request (e.g., network issues, DNS errors)\n", " print(f\"Error fetching URL {url}: {e}\")\n", " except requests.exceptions.HTTPError as e:\n", " # Handle HTTP error responses (e.g., 404 Not Found, 503 Service Unavailable)\n", " print(f\"HTTP Error for {url}: {e}\")\n", "\n", "\n", "def format_python(raw_code: str, output_filename: str) -> str:\n", "\n", " try:\n", " # Format the code string using black\n", " # Use default FileMode which is generally recommended\n", " formatted_code = black.format_str(raw_code, mode=black.FileMode())\n", "\n", " # Save the formatted code to the specified file\n", " with open(output_filename, \"w\", encoding=\"utf-8\") as f:\n", " f.write(formatted_code)\n", "\n", " print(f\"Successfully formatted the code and saved it to '{output_filename}'\")\n", "\n", " except black.InvalidInput as e:\n", " print(\n", " f\"Error formatting code: The input string does not seem to be valid Python syntax.\"\n", " )\n", " print(f\"Details: {e}\")\n", " except Exception as e:\n", " print(f\"An error occurred while writing the file: {e}\")\n", "\n", "\n", "def extract_json_from_string(input_str: str) -> dict | list | None:\n", " \"\"\"\n", " Extracts JSON data from a string, handling potential variations.\n", "\n", " This function attempts to find JSON data within a string. It specifically\n", " looks for JSON enclosed in Markdown-like code fences (```json ... ```).\n", " If such a block is found, it extracts and parses the content.\n", " If no code block is found, it attempts to parse the entire input string\n", " as JSON.\n", "\n", " Args:\n", " input_str: The string potentially containing JSON data. It might be\n", " a plain JSON string or contain a Markdown code block\n", " with JSON, possibly preceded by other text (like 'shame').\n", "\n", " Returns:\n", " The parsed JSON object (typically a dictionary or list) if valid\n", " JSON is found and successfully parsed.\n", " Returns None if no valid JSON is found, if parsing fails, or if the\n", " input is not a string.\n", " \"\"\"\n", " if not isinstance(input_str, str):\n", " # Handle cases where input is not a string\n", " return None\n", "\n", " # Pattern to find JSON within ```json ... ``` blocks\n", " # - ````json`: Matches the start fence.\n", " # - `\\s*`: Matches any leading whitespace after the fence marker.\n", " # - `(.*?)`: Captures the content (non-greedily) between the fences. This is group 1.\n", " # - `\\s*`: Matches any trailing whitespace before the end fence.\n", " # - ` ``` `: Matches the end fence.\n", " # - `re.DOTALL`: Allows '.' to match newline characters.\n", " pattern = r\"```json\\s*(.*?)\\s*```\"\n", " match = re.search(pattern, input_str, re.DOTALL)\n", "\n", " json_string_to_parse = None\n", "\n", " if match:\n", " # If a markdown block is found, extract its content\n", " json_string_to_parse = match.group(\n", " 1\n", " ).strip() # Get captured group and remove surrounding whitespace\n", " else:\n", " # If no markdown block, assume the *entire* input might be JSON\n", " # We strip whitespace in case the string is just JSON with padding\n", " json_string_to_parse = input_str.strip()\n", "\n", " if not json_string_to_parse:\n", " # If after stripping, the potential JSON string is empty, return None\n", " return None\n", "\n", " try:\n", " # Attempt to parse the determined string (either from block or whole input)\n", " parsed_json = json.loads(json_string_to_parse)\n", " return parsed_json\n", " except json.JSONDecodeError:\n", " # Parsing failed, indicating the string wasn't valid JSON\n", " return None\n", " except Exception as e:\n", " # Catch other potential unexpected errors during parsing\n", " print(f\"An unexpected error occurred during JSON parsing: {e}\")\n", " return None" ] }, { "cell_type": "markdown", "metadata": { "id": "DF4l8DTdWgPY" }, "source": [ "### Set up Vertex AI \n", "\n", "To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).\n", "\n", "Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Nqwi-5ufWp_B" }, "outputs": [], "source": [ "# Use the environment variable if the user doesn't provide Project ID.\n", "PROJECT_ID = \"[your-project-id]\" # @param {type: \"string\", placeholder: \"[your-project-id]\", isTemplate: true}\n", "if not PROJECT_ID or PROJECT_ID == \"[your-project-id]\":\n", " PROJECT_ID = str(os.environ.get(\"GOOGLE_CLOUD_PROJECT\"))\n", "\n", "LOCATION = os.environ.get(\"GOOGLE_CLOUD_REGION\", \"us-central1\")" ] }, { "cell_type": "markdown", "metadata": { "id": "9303e8f04b36" }, "source": [ "## Set up Gemini client" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ce653f48ea8a" }, "outputs": [], "source": [ "client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)\n", "\n", "MODEL_ID = (\n", " \"gemini-2.5-pro-preview-03-25\" # This model is only for MCP server code generation.\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "28dd4e628b73" }, "source": [ "### Get system instruction context info" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9ca70637659a" }, "outputs": [], "source": [ "# The URL you want to fetch\n", "url = \"https://modelcontextprotocol.io/quickstart/server\"\n", "reference_content = get_url_content(url)" ] }, { "cell_type": "markdown", "metadata": { "id": "1c2e6d34095a" }, "source": [ "### Set up system instruction" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "af9d590e9798" }, "outputs": [], "source": [ "from pydantic import BaseModel\n", "\n", "\n", "class ResponseSchema(BaseModel):\n", " python_code: str\n", " description: str\n", "\n", "\n", "system_instruction = f\"\"\"\n", " You are an MCP server export.\n", " Your mission is to write python code for MCP server.\n", " Here's the MCP server development guide and example\n", " {reference_content}\n", " \n", "\"\"\"" ] }, { "cell_type": "markdown", "metadata": { "id": "aedb16d3eb9a" }, "source": [ "#### Define a function to generate MCP server code" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "53eda74134e3" }, "outputs": [], "source": [ "def generate_mcp_server(prompt):\n", " response = client.models.generate_content(\n", " model=MODEL_ID,\n", " contents=prompt,\n", " config=GenerateContentConfig(\n", " system_instruction=system_instruction,\n", " response_mime_type=\"application/json\",\n", " response_schema=ResponseSchema,\n", " ),\n", " )\n", "\n", " return response.text" ] }, { "cell_type": "markdown", "metadata": { "id": "5f66609b4467" }, "source": [ "## Generate MCP Server Code" ] }, { "cell_type": "markdown", "metadata": { "id": "9290b62139fc" }, "source": [ "### Example 1: Create MCP Server for Google Cloud BigQuery" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5541f11eb3b1" }, "outputs": [], "source": [ "prompt = \"\"\"\n", " Please create an MCP server code for Google Cloud BigQuery. It has two tools. One is to list tables for all datasets, the other is to describe a table. Google Cloud project id and location will be provided for use. please use project id to access BigQuery client.\n", " Please output JSON output only.\n", " \n", "\"\"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "185e11eaf838" }, "outputs": [], "source": [ "response_text = generate_mcp_server(prompt)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "640516444024" }, "outputs": [], "source": [ "python_code = extract_json_from_string(response_text)[\"python_code\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "fc6460fddb0f" }, "outputs": [], "source": [ "format_python(python_code, \"server/bq.py\")" ] }, { "cell_type": "markdown", "metadata": { "id": "1bcd68fb477b" }, "source": [ "### Example 2: Create MCP server for Medlineplus website\n", "Create an MCP server for \n", "https://medlineplus.gov/about/developers/webservices/ API service" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "164b2b133dc7" }, "outputs": [], "source": [ "med_url = \"https://medlineplus.gov/about/developers/webservices/\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ac6658d2216c" }, "outputs": [], "source": [ "prompt_base = \"\"\"\n", " Please create an MCP server code for https://medlineplus.gov/about/developers/webservices/. It has one tool, get_medical_term. You provide a medical term, this tool will return explanation of the medial term\n", " \n", " Here's the API details:\n", " \n", "\"\"\"\n", "\n", "prompt = [prompt_base, types.Part.from_uri(file_uri=med_url, mime_type=\"text/html\")]\n", "response_text = generate_mcp_server(prompt)\n", "python_code = extract_json_from_string(response_text)[\"python_code\"]\n", "\n", "format_python(python_code, \"server/med.py\")" ] }, { "cell_type": "markdown", "metadata": { "id": "03bf9c1817d8" }, "source": [ "### Example 3: Create MCP Server for NIH" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "e20729a6b878" }, "outputs": [], "source": [ "nih_url = \"https://clinicaltables.nlm.nih.gov/apidoc/icd10cm/v3/doc.html\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3d999a2cfc6d" }, "outputs": [], "source": [ "prompt_base = \"\"\"\n", " Please create an MCP server code for NIH. It has one tool, get_icd_10_code. You provide a name or code, it will return top 5 results. \n", " \n", " Here's the API details:\n", "\n", "\"\"\"\n", "prompt = [prompt_base, types.Part.from_uri(file_uri=nih_url, mime_type=\"text/html\")]\n", "response_text = generate_mcp_server(prompt)\n", "python_code = extract_json_from_string(response_text)[\"python_code\"]\n", "\n", "format_python(python_code, \"server/nih.py\")" ] }, { "cell_type": "markdown", "metadata": { "id": "512095cbf243" }, "source": [ "## Testing MCP Servers" ] }, { "cell_type": "markdown", "metadata": { "id": "d1b5e39b0331" }, "source": [ "### Option 1: Use LangChain MCP adaptor to test MCP servers\n", "It works in Jupyter Notebooks. It may give errors in Colab. Please use option 2 if using Colab. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "de9568b392a1" }, "outputs": [], "source": [ "aiplatform.init(\n", " project=PROJECT_ID,\n", " location=LOCATION,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "60dc63bffb2d" }, "outputs": [], "source": [ "llm = ChatVertexAI(\n", " model=\"gemini-2.5-pro-preview-03-25\",\n", " temperature=0,\n", " max_tokens=None,\n", " max_retries=6,\n", " stop=None,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "06661187dbb0" }, "outputs": [], "source": [ "server_configs = {\n", " \"nih\": {\n", " \"command\": \"python\",\n", " \"args\": [\"./server/nih.py\"],\n", " \"transport\": \"stdio\",\n", " },\n", " \"med\": {\n", " \"command\": \"python\",\n", " \"args\": [\"./server/med.py\"],\n", " \"transport\": \"stdio\",\n", " },\n", " \"bq\": {\n", " \"command\": \"python\",\n", " \"args\": [\"./server/bq.py\"],\n", " \"transport\": \"stdio\",\n", " },\n", "}" ] }, { "cell_type": "markdown", "metadata": { "id": "d431e3aced9a" }, "source": [ "Set up MCP Client using LangChain MCP Adaptor " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5b95c16103df" }, "outputs": [], "source": [ "async def run_lc_react_agent(server_configs, message):\n", " async with MultiServerMCPClient(server_configs) as client:\n", " agent = create_react_agent(llm, client.get_tools())\n", "\n", " agent_response = await agent.ainvoke({\"messages\": message})\n", " for response in agent_response[\"messages\"]:\n", " user = \"\"\n", "\n", " if isinstance(response, HumanMessage):\n", " user = \"[User]\"\n", " elif isinstance(response, ToolMessage):\n", " user = \"-Tool-\"\n", " elif isinstance(response, AIMessage):\n", " user = \"[Agent]\"\n", "\n", " if isinstance(response.content, list):\n", " display(Markdown(f'{user}: {response.content[0].get(\"text\", \"\")}'))\n", " continue\n", " display(Markdown(f\"{user}: {response.content}\"))" ] }, { "cell_type": "markdown", "metadata": { "id": "0f2da61115d3" }, "source": [ "#### Example 1: BigQuery MCP server testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "be6b7142ecbb" }, "outputs": [], "source": [ "await run_lc_react_agent(\n", " server_configs, \"Please list my BigQuery tables for project 'dw-genai-dev'\"\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "4b5d9b718d33" }, "source": [ "#### Example 2: MedlinePlus MCP server testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6b246992387a" }, "outputs": [], "source": [ "await run_lc_react_agent(server_configs, \"Please explain flu in details\")" ] }, { "cell_type": "markdown", "metadata": { "id": "bbbcbd30e41c" }, "source": [ "#### Example 3: NIH MCP Server testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "0020c5836140" }, "outputs": [], "source": [ "await run_lc_react_agent(server_configs, \"can you tell me icd-10 code for influenza A?\")" ] }, { "cell_type": "markdown", "metadata": { "id": "9b7ba8bc08c4" }, "source": [ "### Option 2: Build your own agent to test MCP servers" ] }, { "cell_type": "markdown", "metadata": { "id": "001fe3dc26d8" }, "source": [ "##### Gemini Agent\n", "\n", "Within an MCP client session, this agent loop runs a multi-turn conversation loop with a Gemini model, handling tool calls via MCP server.\n", "\n", "This function orchestrates the interaction between a user prompt, a Gemini model capable of function calling, and a session object that provides and executes tools. It handles the cycle of:\n", "- Gemini gets tool information from MCP client session\n", "- Sending the user prompt (and conversation history) to the model.\n", "- If the model requests tool calls, Gemini makes initial function calls to get structured data as per schema, and \n", "- Sending the tool execution results back to the model.\n", "- Repeating until the model provides a text response or the maximum number of tool execution turns is reached.\n", "- Gemini generates final response based on tool responses and original query.\n", " \n", "MCP integration with Gemini\n", "\n", "<img src=\"https://storage.googleapis.com/github-repo/generative-ai/gemini/mcp/mcp_tool_call.png\" alt=\"MCP with Gemini\" height=\"700\">" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "29d7dcb4d2d4" }, "outputs": [], "source": [ "# --- Configuration ---\n", "# Consider using a more recent/recommended model if available and suitable\n", "\n", "DEFAULT_MAX_TOOL_TURNS = 5 # Maximum consecutive turns for tool execution\n", "DEFAULT_INITIAL_TEMPERATURE = (\n", " 0.0 # Temperature for the first LLM call (more deterministic)\n", ")\n", "DEFAULT_TOOL_CALL_TEMPERATURE = (\n", " 1.0 # Temperature for LLM calls after tool use (potentially more creative)\n", ")\n", "\n", "# Make tool calls via MCP Server\n", "\n", "\n", "async def _execute_tool_calls(\n", " function_calls: list[types.FunctionCall], session: ClientSession\n", ") -> list[types.Part]:\n", " \"\"\"\n", " Executes a list of function calls requested by the Gemini model via the session.\n", "\n", " Args:\n", " function_calls: A list of FunctionCall objects from the model's response.\n", " session: The session object capable of executing tools via `call_tool`.\n", "\n", " Returns:\n", " A list of Part objects, each containing a FunctionResponse corresponding\n", " to the execution result of a requested tool call.\n", " \"\"\"\n", " tool_response_parts: list[types.Part] = []\n", " print(f\"--- Executing {len(function_calls)} tool call(s) ---\")\n", "\n", " for func_call in function_calls:\n", " tool_name = func_call.name\n", " # Ensure args is a dictionary, even if missing or not a dict type\n", " args = func_call.args if isinstance(func_call.args, dict) else {}\n", " print(f\" Attempting to call session tool: '{tool_name}' with args: {args}\")\n", "\n", " tool_result_payload: dict[str, Any]\n", " try:\n", " # Execute the tool using the provided session object\n", " # Assumes session.call_tool returns an object with attributes\n", " # like `isError` (bool) and `content` (list of Part-like objects).\n", " tool_result = await session.call_tool(tool_name, args)\n", " print(f\" Session tool '{tool_name}' execution finished.\")\n", "\n", " # Extract result or error message from the tool result object\n", " result_text = \"\"\n", " # Check structure carefully based on actual `session.call_tool` return type\n", " if (\n", " hasattr(tool_result, \"content\")\n", " and tool_result.content\n", " and hasattr(tool_result.content[0], \"text\")\n", " ):\n", " result_text = tool_result.content[0].text or \"\"\n", "\n", " if hasattr(tool_result, \"isError\") and tool_result.isError:\n", " error_message = (\n", " result_text\n", " or f\"Tool '{tool_name}' failed without specific error message.\"\n", " )\n", " print(f\" Tool '{tool_name}' reported an error: {error_message}\")\n", " tool_result_payload = {\"error\": error_message}\n", " else:\n", " print(\n", " f\" Tool '{tool_name}' succeeded. Result snippet: {result_text[:150]}...\"\n", " ) # Log snippet\n", " tool_result_payload = {\"result\": result_text}\n", "\n", " except Exception as e:\n", " # Catch exceptions during the tool call itself\n", " error_message = f\"Tool execution framework failed: {type(e).__name__}: {e}\"\n", " print(f\" Error executing tool '{tool_name}': {error_message}\")\n", " tool_result_payload = {\"error\": error_message}\n", "\n", " # Create a FunctionResponse Part to send back to the model\n", " tool_response_parts.append(\n", " types.Part.from_function_response(\n", " name=tool_name, response=tool_result_payload\n", " )\n", " )\n", " print(f\"--- Finished executing tool call(s) ---\")\n", " return tool_response_parts\n", "\n", "\n", "async def run_agent_loop(\n", " prompt: str,\n", " client: genai.Client,\n", " session: ClientSession,\n", " model_id: str = MODEL_ID,\n", " max_tool_turns: int = DEFAULT_MAX_TOOL_TURNS,\n", " initial_temperature: float = DEFAULT_INITIAL_TEMPERATURE,\n", " tool_call_temperature: float = DEFAULT_TOOL_CALL_TEMPERATURE,\n", ") -> types.GenerateContentResponse:\n", " \"\"\"\n", " Runs a multi-turn conversation loop with a Gemini model, handling tool calls.\n", "\n", " This function orchestrates the interaction between a user prompt, a Gemini\n", " model capable of function calling, and a session object that provides\n", " and executes tools. It handles the cycle of:\n", " 1. Sending the user prompt (and conversation history) to the model.\n", " 2. If the model requests tool calls, executing them via the `session`.\n", " 3. Sending the tool execution results back to the model.\n", " 4. Repeating until the model provides a text response or the maximum\n", " number of tool execution turns is reached.\n", "\n", " Args:\n", " prompt: The initial user prompt to start the conversation.\n", " client: An initialized Gemini GenerativeModel client object\n", "\n", " session: An active session object responsible for listing available tools\n", " via `list_tools()` and executing them via `call_tool(tool_name, args)`.\n", " It's also expected to have an `initialize()` method.\n", " model_id: The identifier of the Gemini model to use (e.g., \"gemini-2.0-flash\").\n", " max_tool_turns: The maximum number of consecutive turns dedicated to tool calls\n", " before forcing a final response or exiting.\n", " initial_temperature: The temperature setting for the first model call.\n", " tool_call_temperature: The temperature setting for subsequent model calls\n", " that occur after tool execution.\n", "\n", " Returns:\n", " The final Response from the Gemini model after the\n", " conversation loop concludes (either with a text response or after\n", " reaching the max tool turns).\n", "\n", " Raises:\n", " ValueError: If the session object does not provide any tools.\n", " Exception: Can potentially raise exceptions from the underlying API calls\n", " or session tool execution if not caught internally by `_execute_tool_calls`.\n", " \"\"\"\n", " print(\n", " f\"Starting agent loop with model '{model_id}' and prompt: '{prompt[:100]}...'\"\n", " )\n", "\n", " # Initialize conversation history with the user's prompt\n", " contents: list[types.Content] = [\n", " types.Content(role=\"user\", parts=[types.Part(text=prompt)])\n", " ]\n", "\n", " # Ensure the session is ready (if needed)\n", " if hasattr(session, \"initialize\") and callable(session.initialize):\n", " print(\"Initializing session...\")\n", " await session.initialize()\n", " else:\n", " print(\"Session object does not have an initialize() method, proceeding anyway.\")\n", "\n", " # --- 1. Discover Tools from Session ---\n", " print(\"Listing tools from session...\")\n", " # Assumes session.list_tools() returns an object with a 'tools' attribute (list)\n", " # Each item in the list should have 'name', 'description', and 'inputSchema' attributes.\n", " session_tool_list = await session.list_tools()\n", "\n", " if not session_tool_list or not session_tool_list.tools:\n", " raise ValueError(\"No tools provided by the session. Agent loop cannot proceed.\")\n", "\n", " # Convert session tools to the format required by the Gemini API\n", " gemini_tool_config = types.Tool(\n", " function_declarations=[\n", " types.FunctionDeclaration(\n", " name=tool.name,\n", " description=tool.description,\n", " parameters=tool.inputSchema, # Assumes inputSchema is compatible\n", " )\n", " for tool in session_tool_list.tools\n", " ]\n", " )\n", " print(\n", " f\"Configured Gemini with {len(gemini_tool_config.function_declarations)} tool(s).\"\n", " )\n", "\n", " # --- 2. Initial Model Call ---\n", " print(\"Making initial call to Gemini model...\")\n", " current_temperature = initial_temperature\n", " response = await client.aio.models.generate_content(\n", " model=MODEL_ID,\n", " contents=contents, # Send updated history\n", " config=types.GenerateContentConfig(\n", " temperature=1.0,\n", " tools=[gemini_tool_config],\n", " ), # Keep sending same config\n", " )\n", " print(\"Initial response received.\")\n", "\n", " # Append the model's first response (potentially including function calls) to history\n", " # Need to handle potential lack of candidates or content\n", " if not response.candidates:\n", " print(\"Warning: Initial model response has no candidates.\")\n", " # Decide how to handle this - raise error or return the empty response?\n", " return response\n", " contents.append(response.candidates[0].content)\n", "\n", " # --- 3. Tool Calling Loop ---\n", " turn_count = 0\n", " # Check specifically for FunctionCall objects in the latest response part\n", " latest_content = response.candidates[0].content\n", " has_function_calls = any(part.function_call for part in latest_content.parts)\n", "\n", " while has_function_calls and turn_count < max_tool_turns:\n", " turn_count += 1\n", " print(f\"\\n--- Tool Turn {turn_count}/{max_tool_turns} ---\")\n", "\n", " # --- 3.1 Execute Pending Function Calls ---\n", " function_calls_to_execute = [\n", " part.function_call for part in latest_content.parts if part.function_call\n", " ]\n", " tool_response_parts = await _execute_tool_calls(\n", " function_calls_to_execute, session\n", " )\n", "\n", " # --- 3.2 Add Tool Responses to History ---\n", " # Send back the results for *all* function calls from the previous turn\n", " contents.append(\n", " types.Content(role=\"function\", parts=tool_response_parts)\n", " ) # Use \"function\" role\n", " print(f\"Added {len(tool_response_parts)} tool response part(s) to history.\")\n", "\n", " # --- 3.3 Make Subsequent Model Call with Tool Responses ---\n", " print(\"Making subsequent API call to Gemini with tool responses...\")\n", " current_temperature = tool_call_temperature # Use different temp for follow-up\n", " response = await client.aio.models.generate_content(\n", " model=MODEL_ID,\n", " contents=contents, # Send updated history\n", " config=types.GenerateContentConfig(\n", " temperature=1.0,\n", " tools=[gemini_tool_config],\n", " ),\n", " )\n", " print(\"Subsequent response received.\")\n", "\n", " # --- 3.4 Append latest model response and check for more calls ---\n", " if not response.candidates:\n", " print(\"Warning: Subsequent model response has no candidates.\")\n", " break # Exit loop if no candidates are returned\n", " latest_content = response.candidates[0].content\n", " contents.append(latest_content)\n", " has_function_calls = any(part.function_call for part in latest_content.parts)\n", " if not has_function_calls:\n", " print(\n", " \"Model response contains text, no further tool calls requested this turn.\"\n", " )\n", "\n", " # --- 4. Loop Termination Check ---\n", " if turn_count >= max_tool_turns and has_function_calls:\n", " print(\n", " f\"Maximum tool turns ({max_tool_turns}) reached. Exiting loop even though function calls might be pending.\"\n", " )\n", " elif not has_function_calls:\n", " print(\"Tool calling loop finished naturally (model provided text response).\")\n", "\n", " # --- 5. Return Final Response ---\n", " print(\"Agent loop finished. Returning final response.\")\n", " return response" ] }, { "cell_type": "markdown", "metadata": { "id": "f1fffdaac748" }, "source": [ "#### Set up MCP client" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "59b6a27acb04" }, "outputs": [], "source": [ "async def run_simple_agent(server_params, query):\n", " async with stdio_client(server_params) as (\n", " read,\n", " write,\n", " ):\n", " async with ClientSession(\n", " read,\n", " write,\n", " ) as session:\n", " # Test prompt\n", " prompt = query\n", " print(f\"Running agent loop with prompt: {prompt}\")\n", " # Run agent loop\n", " res = await run_agent_loop(prompt, client, session)\n", " return res" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "38a57e271793" }, "outputs": [], "source": [ "bq_server_params = StdioServerParameters(\n", " command=\"python\",\n", " # Make sure to update to the full absolute path to your server file\n", " args=[\"./server/bq.py\"],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "2f85b0c63710" }, "outputs": [], "source": [ "med_server_params = StdioServerParameters(\n", " command=\"python\",\n", " # Make sure to update to the full absolute path to your server file\n", " args=[\"./server/med.py\"],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "e8f63ea4fc1e" }, "outputs": [], "source": [ "nih_server_params = StdioServerParameters(\n", " command=\"python\",\n", " # Make sure to update to the full absolute path to your server file\n", " args=[\"./server/nih.py\"],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "c713c687d705" }, "outputs": [], "source": [ "bq_query = (\n", " \"Please list my BigQuery tables, project id is 'dw-genai-dev', location is 'us'\"\n", ")\n", "bq_res = await run_simple_agent(bq_server_params, bq_query)\n", "print(bq_res.text)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9d2916bef5bb" }, "outputs": [], "source": [ "med_query = \"Please explain flu in detail.\"\n", "med_res = await run_simple_agent(med_server_params, med_query)\n", "print(med_res.text)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "59440aa49163" }, "outputs": [], "source": [ "nih_query = \"Please tell me icd-10 code for pneumonia\"\n", "nih_res = await run_simple_agent(nih_server_params, nih_query)\n", "print(nih_res.text)" ] } ], "metadata": { "colab": { "name": "build_mcp_server_by_gemini.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }