in agents/utils/connections.py [0:0]
def create_mcp_connection(config: dict[str, Any]) -> MCPConnection:
"""Factory function to create the appropriate MCP connection."""
conn_type = config.get("type", "stdio").lower()
if conn_type == "stdio":
if not config.get("command"):
raise ValueError("Command is required for STDIO connections")
return MCPConnectionStdio(
command=config["command"],
args=config.get("args"),
env=config.get("env"),
)
elif conn_type == "sse":
if not config.get("url"):
raise ValueError("URL is required for SSE connections")
return MCPConnectionSSE(
url=config["url"], headers=config.get("headers")
)
else:
raise ValueError(f"Unsupported connection type: {conn_type}")