plugins/golang-filter/mcp-session/config.go (121 lines of code) (raw):

package mcp_session import ( "fmt" _ "net/http/pprof" xds "github.com/cncf/xds/go/xds/type/v3" "google.golang.org/protobuf/types/known/anypb" "github.com/alibaba/higress/plugins/golang-filter/mcp-session/common" "github.com/alibaba/higress/plugins/golang-filter/mcp-session/handler" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" ) const Name = "mcp-session" const Version = "1.0.0" const ConfigPathSuffix = "/config" const DefaultServerName = "higress-mcp-server" var GlobalSSEPathSuffix = "/sse" type config struct { matchList []common.MatchRule enableUserLevelServer bool rateLimitConfig *handler.MCPRatelimitConfig defaultServer *common.SSEServer redisClient *common.RedisClient } func (c *config) Destroy() { if c.redisClient != nil { api.LogDebug("Closing Redis client") c.redisClient.Close() } } type Parser struct { } // Parse the filter configuration func (p *Parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) { configStruct := &xds.TypedStruct{} if err := any.UnmarshalTo(configStruct); err != nil { return nil, err } v := configStruct.Value conf := &config{ matchList: make([]common.MatchRule, 0), } // Parse match_list if exists if matchList, ok := v.AsMap()["match_list"].([]interface{}); ok { conf.matchList = common.ParseMatchList(matchList) } // Redis configuration is optional if redisConfigMap, ok := v.AsMap()["redis"].(map[string]interface{}); ok { redisConfig, err := common.ParseRedisConfig(redisConfigMap) if err != nil { return nil, fmt.Errorf("failed to parse redis config: %w", err) } redisClient, err := common.NewRedisClient(redisConfig) if err != nil { api.LogErrorf("Failed to initialize Redis client: %w", err) } else { api.LogDebug("Redis client initialized") } conf.redisClient = redisClient } else { api.LogDebug("Redis configuration not provided, running without Redis") } enableUserLevelServer, ok := v.AsMap()["enable_user_level_server"].(bool) if !ok { enableUserLevelServer = false if conf.redisClient == nil { return nil, fmt.Errorf("redis configuration is not provided, enable_user_level_server is true") } } conf.enableUserLevelServer = enableUserLevelServer if rateLimit, ok := v.AsMap()["rate_limit"].(map[string]interface{}); ok { rateLimitConfig := &handler.MCPRatelimitConfig{} if limit, ok := rateLimit["limit"].(float64); ok { rateLimitConfig.Limit = int(limit) } if window, ok := rateLimit["window"].(float64); ok { rateLimitConfig.Window = int(window) } if whiteList, ok := rateLimit["white_list"].([]interface{}); ok { for _, item := range whiteList { if uid, ok := item.(string); ok { rateLimitConfig.Whitelist = append(rateLimitConfig.Whitelist, uid) } } } if errorText, ok := rateLimit["error_text"].(string); ok { rateLimitConfig.ErrorText = errorText } conf.rateLimitConfig = rateLimitConfig } ssePathSuffix, ok := v.AsMap()["sse_path_suffix"].(string) if !ok || ssePathSuffix == "" { return nil, fmt.Errorf("sse path suffix is not set or empty") } GlobalSSEPathSuffix = ssePathSuffix return conf, nil } func (p *Parser) Merge(parent interface{}, child interface{}) interface{} { parentConfig := parent.(*config) childConfig := child.(*config) newConfig := *parentConfig if childConfig.matchList != nil { newConfig.matchList = childConfig.matchList } newConfig.enableUserLevelServer = childConfig.enableUserLevelServer if childConfig.rateLimitConfig != nil { newConfig.rateLimitConfig = childConfig.rateLimitConfig } if childConfig.defaultServer != nil { newConfig.defaultServer = childConfig.defaultServer } return &newConfig } func FilterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { conf, ok := c.(*config) if !ok { panic("unexpected config type") } return &filter{ callbacks: callbacks, config: conf, stopChan: make(chan struct{}), mcpConfigHandler: handler.NewMCPConfigHandler(conf.redisClient, callbacks), mcpRatelimitHandler: handler.NewMCPRatelimitHandler(conf.redisClient, callbacks, conf.rateLimitConfig), } }