pkg/ingress/kube/configmap/mcp_server.go (443 lines of code) (raw):
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package configmap
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"sync/atomic"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
"github.com/alibaba/higress/registry/reconcile"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
)
// RedisConfig defines the configuration for Redis connection
type RedisConfig struct {
// The address of Redis server in the format of "host:port"
Address string `json:"address,omitempty"`
// The username for Redis authentication
Username string `json:"username,omitempty"`
// The password for Redis authentication
Password string `json:"password,omitempty"`
// The database index to use
DB int `json:"db,omitempty"`
}
// MCPRatelimitConfig defines the configuration for rate limit
type MCPRatelimitConfig struct {
// The limit of the rate limit
Limit int64 `json:"limit,omitempty"`
// The window of the rate limit
Window int64 `json:"window,omitempty"`
// The white list of the rate limit
WhiteList []string `json:"white_list,omitempty"`
}
// SSEServer defines the configuration for Server-Sent Events (SSE) server
type SSEServer struct {
// The name of the SSE server
Name string `json:"name,omitempty"`
// The path where the SSE server will be mounted, the full path is (PATH + SSEPathSuffix)
Path string `json:"path,omitempty"`
// The type of the SSE server
Type string `json:"type,omitempty"`
// Additional Config parameters for the real MCP server implementation
Config map[string]interface{} `json:"config,omitempty"`
// The domain list of the SSE server
DomainList []string `json:"domain_list,omitempty"`
}
// MatchRule defines a rule for matching requests
type MatchRule struct {
// Domain pattern, supports wildcards
MatchRuleDomain string `json:"match_rule_domain,omitempty"`
// Path pattern to match
MatchRulePath string `json:"match_rule_path,omitempty"`
// Type of match rule: exact, prefix, suffix, contains, regex
MatchRuleType string `json:"match_rule_type,omitempty"`
// Type of upstream(s) matched by the rule: rest (default), sse
UpstreamType string `json:"upstream_type"`
// Enable request path rewrite for matched routes
EnablePathRewrite bool `json:"enable_path_rewrite"`
// Prefix the request path would be rewritten to.
PathRewritePrefix string `json:"path_rewrite_prefix"`
}
// McpServer defines the configuration for MCP (Model Context Protocol) server
type McpServer struct {
// Flag to control whether MCP server is enabled
Enable bool `json:"enable,omitempty"`
// Redis Config for MCP server
Redis *RedisConfig `json:"redis,omitempty"`
// The suffix to be appended to SSE paths, default is "/sse"
SSEPathSuffix string `json:"sse_path_suffix,omitempty"`
// List of SSE servers Configs
Servers []*SSEServer `json:"servers,omitempty"`
// List of match rules for filtering requests
MatchList []*MatchRule `json:"match_list,omitempty"`
// Flag to control whether user level server is enabled
EnableUserLevelServer bool `json:"enable_user_level_server,omitempty"`
// Rate limit config for MCP server
Ratelimit *MCPRatelimitConfig `json:"rate_limit,omitempty"`
}
func NewDefaultMcpServer() *McpServer {
return &McpServer{
Enable: false,
Servers: make([]*SSEServer, 0),
MatchList: make([]*MatchRule, 0),
EnableUserLevelServer: false,
}
}
const (
higressMcpServerEnvoyFilterName = "higress-config-mcp-server"
)
func validMcpServer(m *McpServer) error {
if m == nil {
return nil
}
if m.EnableUserLevelServer && m.Redis == nil {
return errors.New("redis config cannot be empty when user level server is enabled")
}
// Validate match rule types
if m.MatchList != nil {
validMatchRuleTypes := map[string]bool{
"exact": true,
"prefix": true,
"suffix": true,
"contains": true,
"regex": true,
}
validUpstreamTypes := map[string]bool{
"rest": true,
"sse": true,
"streamable": true,
}
for _, rule := range m.MatchList {
if rule.MatchRuleType == "" {
return errors.New("match_rule_type cannot be empty, must be one of: exact, prefix, suffix, contains, regex")
}
if !validMatchRuleTypes[rule.MatchRuleType] {
return fmt.Errorf("invalid match_rule_type: %s, must be one of: exact, prefix, suffix, contains, regex", rule.MatchRuleType)
}
if rule.UpstreamType != "" && !validUpstreamTypes[rule.UpstreamType] {
return fmt.Errorf("invalid upstream_type: %s, must be one of: rest, sse, streamable", rule.UpstreamType)
}
if rule.EnablePathRewrite && rule.UpstreamType != "sse" {
return errors.New("path rewrite is only supported for SSE upstream type")
}
}
}
return nil
}
func compareMcpServer(old *McpServer, new *McpServer) (Result, error) {
if old == nil && new == nil {
return ResultNothing, nil
}
if new == nil {
return ResultDelete, nil
}
if !reflect.DeepEqual(old, new) {
return ResultReplace, nil
}
return ResultNothing, nil
}
func deepCopyMcpServer(mcp *McpServer) (*McpServer, error) {
newMcp := NewDefaultMcpServer()
newMcp.Enable = mcp.Enable
if mcp.Redis != nil {
newMcp.Redis = &RedisConfig{
Address: mcp.Redis.Address,
Username: mcp.Redis.Username,
Password: mcp.Redis.Password,
DB: mcp.Redis.DB,
}
}
if mcp.Ratelimit != nil {
newMcp.Ratelimit = &MCPRatelimitConfig{
Limit: mcp.Ratelimit.Limit,
Window: mcp.Ratelimit.Window,
WhiteList: mcp.Ratelimit.WhiteList,
}
}
newMcp.SSEPathSuffix = mcp.SSEPathSuffix
newMcp.EnableUserLevelServer = mcp.EnableUserLevelServer
if len(mcp.Servers) > 0 {
newMcp.Servers = make([]*SSEServer, len(mcp.Servers))
for i, server := range mcp.Servers {
newServer := &SSEServer{
Name: server.Name,
Path: server.Path,
Type: server.Type,
DomainList: server.DomainList,
}
if server.Config != nil {
newServer.Config = make(map[string]interface{})
for k, v := range server.Config {
newServer.Config[k] = v
}
}
newMcp.Servers[i] = newServer
}
}
if len(mcp.MatchList) > 0 {
newMcp.MatchList = make([]*MatchRule, len(mcp.MatchList))
for i, rule := range mcp.MatchList {
newMcp.MatchList[i] = &MatchRule{
MatchRuleDomain: rule.MatchRuleDomain,
MatchRulePath: rule.MatchRulePath,
MatchRuleType: rule.MatchRuleType,
UpstreamType: rule.UpstreamType,
EnablePathRewrite: rule.EnablePathRewrite,
PathRewritePrefix: rule.PathRewritePrefix,
}
}
}
return newMcp, nil
}
type McpServerController struct {
Namespace string
mcpServer atomic.Value
Name string
eventHandler ItemEventHandler
reconciler *reconcile.Reconciler
}
func NewMcpServerController(namespace string) *McpServerController {
mcpController := &McpServerController{
Namespace: namespace,
mcpServer: atomic.Value{},
Name: "mcpServer",
}
mcpController.SetMcpServer(NewDefaultMcpServer())
return mcpController
}
func (m *McpServerController) GetName() string {
return m.Name
}
func (m *McpServerController) SetMcpServer(mcp *McpServer) {
m.mcpServer.Store(mcp)
}
func (m *McpServerController) GetMcpServer() *McpServer {
value := m.mcpServer.Load()
if value != nil {
if mcp, ok := value.(*McpServer); ok {
return mcp
}
}
return nil
}
func (m *McpServerController) AddOrUpdateHigressConfig(name util.ClusterNamespacedName, old *HigressConfig, new *HigressConfig) error {
if err := validMcpServer(new.McpServer); err != nil {
IngressLog.Errorf("data:%+v convert to mcp server, error: %+v", new.McpServer, err)
return nil
}
result, _ := compareMcpServer(old.McpServer, new.McpServer)
switch result {
case ResultReplace:
if newMcp, err := deepCopyMcpServer(new.McpServer); err != nil {
IngressLog.Infof("mcp server deepcopy error:%v", err)
} else {
m.SetMcpServer(newMcp)
IngressLog.Infof("AddOrUpdate Higress config mcp server")
m.eventHandler(higressMcpServerEnvoyFilterName)
IngressLog.Infof("send event with filter name:%s", higressMcpServerEnvoyFilterName)
}
case ResultDelete:
m.SetMcpServer(NewDefaultMcpServer())
IngressLog.Infof("Delete Higress config mcp server")
m.eventHandler(higressMcpServerEnvoyFilterName)
IngressLog.Infof("send event with filter name:%s", higressMcpServerEnvoyFilterName)
}
return nil
}
func (m *McpServerController) ValidHigressConfig(higressConfig *HigressConfig) error {
if higressConfig == nil {
return nil
}
if higressConfig.McpServer == nil {
return nil
}
return validMcpServer(higressConfig.McpServer)
}
func (m *McpServerController) RegisterItemEventHandler(eventHandler ItemEventHandler) {
m.eventHandler = eventHandler
}
func (m *McpServerController) RegisterMcpReconciler(reconciler *reconcile.Reconciler) {
m.reconciler = reconciler
}
func (m *McpServerController) ConstructEnvoyFilters() ([]*config.Config, error) {
configs := make([]*config.Config, 0)
mcpServer := m.GetMcpServer()
namespace := m.Namespace
if mcpServer == nil || !mcpServer.Enable {
return configs, nil
}
// mcp-session envoy filter
mcpSessionStruct := m.constructMcpSessionStruct(mcpServer)
if mcpSessionStruct != "" {
sessionConfig := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.EnvoyFilter,
Name: higressMcpServerEnvoyFilterName,
Namespace: namespace,
},
Spec: &networking.EnvoyFilter{
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{
{
ApplyTo: networking.EnvoyFilter_HTTP_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
Context: networking.EnvoyFilter_GATEWAY,
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: "envoy.filters.network.http_connection_manager",
SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{
Name: "envoy.filters.http.cors",
},
},
},
},
},
},
Patch: &networking.EnvoyFilter_Patch{
Operation: networking.EnvoyFilter_Patch_INSERT_AFTER,
Value: util.BuildPatchStruct(mcpSessionStruct),
},
},
},
},
}
configs = append(configs, sessionConfig)
}
// mcp-server envoy filter
mcpServerStruct := m.constructMcpServerStruct(mcpServer)
if mcpServerStruct != "" {
serverConfig := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.EnvoyFilter,
Name: higressMcpServerEnvoyFilterName + "-server",
Namespace: namespace,
},
Spec: &networking.EnvoyFilter{
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{
{
ApplyTo: networking.EnvoyFilter_HTTP_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
Context: networking.EnvoyFilter_GATEWAY,
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: "envoy.filters.network.http_connection_manager",
SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{
Name: "envoy.filters.http.router",
},
},
},
},
},
},
Patch: &networking.EnvoyFilter_Patch{
Operation: networking.EnvoyFilter_Patch_INSERT_BEFORE,
Value: util.BuildPatchStruct(mcpServerStruct),
},
},
},
},
}
configs = append(configs, serverConfig)
}
return configs, nil
}
func (m *McpServerController) constructMcpSessionStruct(mcp *McpServer) string {
// Build match_list configuration
matchList := "[]"
var matchConfigs []string
if len(mcp.MatchList) > 0 {
for _, rule := range mcp.MatchList {
matchConfigs = append(matchConfigs, fmt.Sprintf(`{
"match_rule_domain": "%s",
"match_rule_path": "%s",
"match_rule_type": "%s",
"upstream_type": "%s",
"enable_path_rewrite": %t,
"path_rewrite_prefix": "%s"
}`, rule.MatchRuleDomain, rule.MatchRulePath, rule.MatchRuleType, rule.UpstreamType, rule.EnablePathRewrite, rule.PathRewritePrefix))
}
}
if m.reconciler != nil {
vsFromMcp := m.reconciler.GetAllConfigs(gvk.VirtualService)
for _, c := range vsFromMcp {
vs := c.Spec.(*networking.VirtualService)
var host string
if len(vs.Hosts) > 1 {
host = fmt.Sprintf("(%s)", strings.Join(vs.Hosts, "|"))
} else {
host = vs.Hosts[0]
}
path := vs.Http[0].Match[0].Uri.GetPrefix()
matchConfigs = append(matchConfigs, fmt.Sprintf(`{
"match_rule_domain": "%s",
"match_rule_path": "%s",
"match_rule_type": "prefix"
}`, host, path))
}
}
matchList = fmt.Sprintf("[%s]", strings.Join(matchConfigs, ","))
// Build redis configuration
redisConfig := "null"
if mcp.Redis != nil {
redisConfig = fmt.Sprintf(`{
"address": "%s",
"username": "%s",
"password": "%s",
"db": %d
}`, mcp.Redis.Address, mcp.Redis.Username, mcp.Redis.Password, mcp.Redis.DB)
}
// Build rate limit configuration
rateLimitConfig := "null"
if mcp.Ratelimit != nil {
whiteList := "[]"
if len(mcp.Ratelimit.WhiteList) > 0 {
whiteList = fmt.Sprintf(`["%s"]`, strings.Join(mcp.Ratelimit.WhiteList, `","`))
}
rateLimitConfig = fmt.Sprintf(`{
"limit": %d,
"window": %d,
"white_list": %s
}`, mcp.Ratelimit.Limit, mcp.Ratelimit.Window, whiteList)
}
// Build complete configuration structure
return fmt.Sprintf(`{
"name": "envoy.filters.http.golang",
"typed_config": {
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
"type_url": "type.googleapis.com/envoy.extensions.filters.http.golang.v3alpha.Config",
"value": {
"library_id": "mcp-session",
"library_path": "/var/lib/istio/envoy/golang-filter.so",
"plugin_name": "mcp-session",
"plugin_config": {
"@type": "type.googleapis.com/xds.type.v3.TypedStruct",
"value": {
"redis": %s,
"rate_limit": %s,
"sse_path_suffix": "%s",
"match_list": %s,
"enable_user_level_server": %t
}
}
}
}
}`,
redisConfig,
rateLimitConfig,
mcp.SSEPathSuffix,
matchList,
mcp.EnableUserLevelServer)
}
func (m *McpServerController) constructMcpServerStruct(mcp *McpServer) string {
// Build servers configuration
servers := "[]"
if len(mcp.Servers) > 0 {
serverConfigs := make([]string, len(mcp.Servers))
for i, server := range mcp.Servers {
serverConfig := fmt.Sprintf(`{
"name": "%s",
"path": "%s",
"type": "%s"`,
server.Name, server.Path, server.Type)
if len(server.DomainList) > 0 {
domainList := fmt.Sprintf(`["%s"]`, strings.Join(server.DomainList, `","`))
serverConfig += fmt.Sprintf(`,
"domain_list": %s`, domainList)
}
if len(server.Config) > 0 {
config, _ := json.Marshal(server.Config)
serverConfig += fmt.Sprintf(`,
"config": %s`, string(config))
}
serverConfig += "}"
serverConfigs[i] = serverConfig
}
servers = fmt.Sprintf("[%s]", strings.Join(serverConfigs, ","))
}
// Build complete configuration structure
return fmt.Sprintf(`{
"name": "envoy.filters.http.golang",
"typed_config": {
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
"type_url": "type.googleapis.com/envoy.extensions.filters.http.golang.v3alpha.Config",
"value": {
"library_id": "mcp-server",
"library_path": "/var/lib/istio/envoy/golang-filter.so",
"plugin_name": "mcp-server",
"plugin_config": {
"@type": "type.googleapis.com/xds.type.v3.TypedStruct",
"value": {
"servers": %s
}
}
}
}
}`, servers)
}