plugins/golang-filter/mcp-server/config.go (114 lines of code) (raw):
package mcp_server
import (
"fmt"
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry/nacos"
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm"
mcp_session "github.com/alibaba/higress/plugins/golang-filter/mcp-session"
"github.com/alibaba/higress/plugins/golang-filter/mcp-session/common"
xds "github.com/cncf/xds/go/xds/type/v3"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"google.golang.org/protobuf/types/known/anypb"
)
const Name = "mcp-server"
const Version = "1.0.0"
type SSEServerWrapper struct {
BaseServer *common.SSEServer
DomainList []string
}
type config struct {
servers []*SSEServerWrapper
}
func (c *config) Destroy() {
for _, server := range c.servers {
server.BaseServer.Close()
}
}
type Parser struct {
}
func (p *Parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {
return nil, err
}
v := configStruct.Value
conf := &config{
servers: make([]*SSEServerWrapper, 0),
}
serverConfigs, ok := v.AsMap()["servers"].([]interface{})
if !ok {
api.LogDebug("No servers are configured")
return conf, nil
}
for _, serverConfig := range serverConfigs {
serverConfigMap, ok := serverConfig.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("server config must be an object")
}
serverType, ok := serverConfigMap["type"].(string)
if !ok {
return nil, fmt.Errorf("server type is not set")
}
serverPath, ok := serverConfigMap["path"].(string)
if !ok {
return nil, fmt.Errorf("server %s path is not set", serverType)
}
serverDomainList := []string{}
if domainList, ok := serverConfigMap["domain_list"].([]interface{}); ok {
for _, domain := range domainList {
if domainStr, ok := domain.(string); ok {
serverDomainList = append(serverDomainList, domainStr)
}
}
} else {
serverDomainList = []string{"*"}
}
serverName, ok := serverConfigMap["name"].(string)
if !ok {
return nil, fmt.Errorf("server %s name is not set", serverType)
}
server := common.GlobalRegistry.GetServer(serverType)
if server == nil {
return nil, fmt.Errorf("server %s is not registered", serverType)
}
serverConfig, ok := serverConfigMap["config"].(map[string]interface{})
if !ok {
api.LogDebug(fmt.Sprintf("No config provided for server %s", serverType))
}
api.LogDebug(fmt.Sprintf("Server config: %+v", serverConfig))
err := server.ParseConfig(serverConfig)
if err != nil {
return nil, fmt.Errorf("failed to parse server config: %w", err)
}
serverInstance, err := server.NewServer(serverName)
if err != nil {
return nil, fmt.Errorf("failed to initialize DBServer: %w", err)
}
conf.servers = append(conf.servers, &SSEServerWrapper{
BaseServer: common.NewSSEServer(serverInstance,
common.WithSSEEndpoint(fmt.Sprintf("%s%s", serverPath, mcp_session.GlobalSSEPathSuffix)),
common.WithMessageEndpoint(serverPath)),
DomainList: serverDomainList,
})
api.LogDebug(fmt.Sprintf("Registered MCP Server: %s", serverType))
}
return conf, nil
}
func (p *Parser) Merge(parent interface{}, child interface{}) interface{} {
parentConfig := parent.(*config)
childConfig := child.(*config)
newConfig := *parentConfig
if childConfig.servers != nil {
newConfig.servers = childConfig.servers
}
return &newConfig
}
func FilterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
config: conf,
callbacks: callbacks,
}
}