registry/nacos/mcpserver/watcher.go (866 lines of code) (raw):
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mcpserver
import (
"encoding/json"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
common2 "github.com/alibaba/higress/pkg/ingress/kube/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/atomic"
"istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/util/sets"
)
const (
DefaultInitTimeout = time.Second * 10
DefaultNacosTimeout = 5000
DefaultNacosLogLevel = "info"
DefaultNacosLogDir = "/var/log/nacos/log/mcp/log"
DefaultNacosCacheDir = "/var/log/nacos/log/mcp/cache"
DefaultNacosNotLoadCache = true
DefaultNacosLogMaxAge = 3
DefaultRefreshInterval = time.Second * 30
DefaultRefreshIntervalLimit = time.Second * 10
DefaultFetchPageSize = 50
DefaultJoiner = "@@"
NacosV3LabelKey = "isV3"
)
var mcpServerLog = log.RegisterScope("McpServer", "Nacos Mcp Server Watcher process.")
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
watchingConfig map[string]bool
watchingConfigRefs map[string]sets.Set[string]
configToConfigListener map[string]*MultiConfigListener
serviceCache map[string]*ServiceCache
configToService map[string]string
credentialKeyToName map[string]map[string]string
RegistryType provider.ServiceRegistryType
Status provider.WatcherStatus
configClient config_client.IConfigClient
serverConfig []constant.ServerConfig
cache memory.Cache
mutex *sync.Mutex
subMutex *sync.Mutex
callbackMutex *sync.Mutex
stop chan struct{}
isStop bool
updateCacheWhenEmpty bool
nacosClientConfig *constant.ClientConfig
namespace string
clusterId string
authOption provider.AuthOption
}
type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
watchingConfig: make(map[string]bool),
configToService: make(map[string]string),
watchingConfigRefs: make(map[string]sets.Set[string]),
configToConfigListener: make(map[string]*MultiConfigListener),
credentialKeyToName: make(map[string]map[string]string),
serviceCache: map[string]*ServiceCache{},
RegistryType: "nacos3",
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
subMutex: &sync.Mutex{},
callbackMutex: &sync.Mutex{},
stop: make(chan struct{}),
}
w.NacosRefreshInterval = int64(DefaultRefreshInterval)
for _, opt := range opts {
opt(w)
}
// The nacos mcp server uses these restricted namespaces and groups, and may be adjusted in the future.
w.NacosNamespace = "nacos-default-mcp"
w.NacosNamespaceId = w.NacosNamespace
w.NacosGroups = []string{"mcp-server"}
mcpServerLog.Infof("new nacos mcp server watcher with config Name:%s", w.Name)
w.nacosClientConfig = constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir),
constant.WithCacheDir(DefaultNacosCacheDir),
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
MaxAge: DefaultNacosLogMaxAge,
}),
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
constant.WithNamespaceId(w.NacosNamespaceId),
constant.WithAccessKey(w.NacosAccessKey),
constant.WithSecretKey(w.NacosSecretKey),
constant.WithUsername(w.authOption.NacosUsername),
constant.WithPassword(w.authOption.NacosPassword),
)
initTimer := time.NewTimer(DefaultInitTimeout)
w.serverConfig = []constant.ServerConfig{
*constant.NewServerConfig(w.Domain, uint64(w.Port)),
}
success := make(chan struct{})
go func() {
configClient, err := clients.NewConfigClient(vo.NacosClientParam{
ClientConfig: w.nacosClientConfig,
ServerConfigs: w.serverConfig,
})
if err == nil {
w.configClient = configClient
close(success)
} else {
mcpServerLog.Errorf("can not create naming client, err:%v", err)
}
}()
select {
case <-initTimer.C:
return nil, errors.New("new nacos mcp server watcher timeout")
case <-success:
return w, nil
}
}
func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
return func(w *watcher) {
w.NacosAddressServer = nacosAddressServer
}
}
func WithNacosAccessKey(nacosAccessKey string) WatcherOption {
return func(w *watcher) {
w.NacosAccessKey = nacosAccessKey
}
}
func WithNacosSecretKey(nacosSecretKey string) WatcherOption {
return func(w *watcher) {
w.NacosSecretKey = nacosSecretKey
}
}
func WithNacosRefreshInterval(refreshInterval int64) WatcherOption {
return func(w *watcher) {
if refreshInterval < int64(DefaultRefreshIntervalLimit) {
refreshInterval = int64(DefaultRefreshIntervalLimit)
}
w.NacosRefreshInterval = refreshInterval
}
}
func WithType(t string) WatcherOption {
return func(w *watcher) {
w.Type = t
}
}
func WithName(name string) WatcherOption {
return func(w *watcher) {
w.Name = name
}
}
func WithDomain(domain string) WatcherOption {
return func(w *watcher) {
w.Domain = domain
}
}
func WithPort(port uint32) WatcherOption {
return func(w *watcher) {
w.Port = port
}
}
func WithMcpExportDomains(exportDomains []string) WatcherOption {
return func(w *watcher) {
w.McpServerExportDomains = exportDomains
}
}
func WithMcpBaseUrl(url string) WatcherOption {
return func(w *watcher) {
w.McpServerBaseUrl = url
}
}
func WithEnableMcpServer(enable *wrappers.BoolValue) WatcherOption {
return func(w *watcher) {
w.EnableMCPServer = enable
}
}
func WithNamespace(ns string) WatcherOption {
return func(w *watcher) {
w.namespace = ns
}
}
func WithClusterId(id string) WatcherOption {
return func(w *watcher) {
w.clusterId = id
}
}
func WithAuthOption(authOption provider.AuthOption) WatcherOption {
return func(w *watcher) {
w.authOption = authOption
}
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
err := w.fetchAllMcpConfig()
if err != nil {
mcpServerLog.Errorf("first fetch mcp server config failed, err:%v", err)
} else {
w.Ready(true)
}
for {
select {
case <-ticker.C:
err := w.fetchAllMcpConfig()
if err != nil {
mcpServerLog.Errorf("fetch mcp server config failed, err:%v", err)
} else {
w.Ready(true)
}
case <-w.stop:
return
}
}
}
func (w *watcher) fetchAllMcpConfig() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedConfigs := make(map[string]bool)
var tries int
isV3 := true
if w.EnableMCPServer != nil {
isV3 = w.EnableMCPServer.GetValue()
}
for _, groupName := range w.NacosGroups {
for page := 1; ; page++ {
ss, err := w.configClient.SearchConfig(vo.SearchConfigParam{
Group: groupName,
Search: "blur",
PageNo: page,
PageSize: DefaultFetchPageSize,
IsV3: isV3,
})
if err != nil {
if tries > 10 {
return err
}
mcpServerLog.Errorf("fetch nacos config list failed, err:%v, pageNo:%d", err, page)
page--
tries++
continue
}
for _, item := range ss.PageItems {
fetchedConfigs[groupName+DefaultJoiner+item.DataId] = true
}
if len(ss.PageItems) < DefaultFetchPageSize {
break
}
}
}
for key := range w.watchingConfig {
if _, exist := fetchedConfigs[key]; !exist {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err != nil {
return err
}
delete(w.watchingConfig, key)
}
}
wg := sync.WaitGroup{}
subscribeFailed := atomic.NewBool(false)
watchingKeys := make(chan string, len(fetchedConfigs))
for key := range fetchedConfigs {
s := strings.Split(key, DefaultJoiner)
if _, exist := w.watchingConfig[key]; !exist {
wg.Add(1)
go func(k string) {
err := w.subscribe(s[0], s[1])
if err != nil {
subscribeFailed.Store(true)
mcpServerLog.Errorf("subscribe failed, group: %v, service: %v, errors: %v", s[0], s[1], err)
} else {
watchingKeys <- k
}
wg.Done()
}(key)
}
}
wg.Wait()
close(watchingKeys)
for key := range watchingKeys {
w.watchingConfig[key] = true
}
if subscribeFailed.Load() {
return errors.New("subscribe services failed")
}
return nil
}
func (w *watcher) unsubscribe(groupName string, dataId string) error {
mcpServerLog.Infof("unsubscribe mcp server, groupName:%s, dataId:%s", groupName, dataId)
defer w.UpdateService()
err := w.configClient.CancelListenConfig(vo.ConfigParam{
DataId: dataId,
Group: groupName,
})
if err != nil {
mcpServerLog.Errorf("unsubscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
return err
}
key := strings.Join([]string{w.Name, w.NacosNamespace, groupName, dataId}, DefaultJoiner)
w.configToConfigListener[key].Stop()
delete(w.watchingConfigRefs, key)
delete(w.configToConfigListener, key)
// remove service for this config
configKey := strings.Join([]string{groupName, dataId}, DefaultJoiner)
svcInfo := w.configToService[configKey]
split := strings.Split(svcInfo, DefaultJoiner)
svcNamespace := split[0]
svcGroup := split[1]
svcName := split[2]
if w.serviceCache[svcNamespace] != nil {
err = w.serviceCache[svcNamespace].RemoveListener(svcGroup, svcName, configKey)
if err != nil {
mcpServerLog.Errorf("remove service listener error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
}
}
delete(w.configToService, configKey)
w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true)
return nil
}
func (w *watcher) subscribe(groupName string, dataId string) error {
mcpServerLog.Infof("subscribe mcp server, groupName:%s, dataId:%s", groupName, dataId)
// first we get this config and callback manually
content, err := w.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: groupName,
})
if err != nil {
mcpServerLog.Errorf("get config %s/%s err: %v", groupName, dataId, err)
} else {
w.getConfigCallback(w.NacosNamespace, groupName, dataId, content)
}
// second, we set callback for this config
err = w.configClient.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: groupName,
OnChange: w.getConfigCallback,
})
if err != nil {
mcpServerLog.Errorf("subscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
return err
}
return nil
}
func (w *watcher) getConfigCallback(namespace, group, dataId, data string) {
mcpServerLog.Infof("get config callback, namespace:%s, groupName:%s, dataId:%s", namespace, group, dataId)
if data == "" {
return
}
key := strings.Join([]string{w.Name, w.NacosNamespace, group, dataId}, DefaultJoiner)
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json"))
mcpServer := &provider.McpServer{}
if err := json.Unmarshal([]byte(data), mcpServer); err != nil {
mcpServerLog.Errorf("Unmarshal config data to mcp server error:%v, namespace:%s, groupName:%s, dataId:%s", err, namespace, group, dataId)
return
}
if mcpServer.Protocol == provider.StdioProtocol || mcpServer.Protocol == provider.DubboProtocol || mcpServer.Protocol == provider.McpSSEProtocol {
return
}
// process mcp service
w.subMutex.Lock()
defer w.subMutex.Unlock()
if err := w.buildServiceEntryForMcpServer(mcpServer, group, dataId); err != nil {
mcpServerLog.Errorf("build service entry for mcp server failed, namespace %v, group: %v, dataId %v, errors: %v", namespace, group, dataId, err)
}
// process mcp wasm
// only generate wasm plugin for http protocol mcp server
if mcpServer.Protocol != provider.HttpProtocol {
return
}
if _, exist := w.configToConfigListener[key]; !exist {
w.configToConfigListener[key] = NewMultiConfigListener(w.configClient, w.multiCallback(mcpServer, routeName, key))
}
if _, exist := w.watchingConfigRefs[key]; !exist {
w.watchingConfigRefs[key] = sets.New[string]()
}
listener := w.configToConfigListener[key]
curRef := sets.Set[string]{}
// add description ref
curRef.Insert(strings.Join([]string{provider.DefaultMcpToolsGroup, mcpServer.ToolsDescriptionRef}, DefaultJoiner))
// add credential ref
credentialNameMap := map[string]string{}
for name, ref := range mcpServer.Credentials {
credKey := strings.Join([]string{provider.DefaultMcpCredentialsGroup, ref.Ref}, DefaultJoiner)
curRef.Insert(credKey)
credentialNameMap[credKey] = name
}
w.callbackMutex.Lock()
w.credentialKeyToName[key] = credentialNameMap
w.callbackMutex.Unlock()
toBeAdd := curRef.Difference(w.watchingConfigRefs[key])
toBeDelete := w.watchingConfigRefs[key].Difference(curRef)
var toBeListen, toBeUnListen []vo.ConfigParam
for item, _ := range toBeAdd {
split := strings.Split(item, DefaultJoiner)
toBeListen = append(toBeListen, vo.ConfigParam{
Group: split[0],
DataId: split[1],
})
}
for item, _ := range toBeDelete {
split := strings.Split(item, DefaultJoiner)
toBeUnListen = append(toBeUnListen, vo.ConfigParam{
Group: split[0],
DataId: split[1],
})
}
// listen description and credential config
if len(toBeListen) > 0 {
if err := listener.StartListen(toBeListen); err != nil {
mcpServerLog.Errorf("listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
}
}
// cancel listen description and credential config
if len(toBeUnListen) > 0 {
if err := listener.CancelListen(toBeUnListen); err != nil {
mcpServerLog.Errorf("cancel listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
}
}
}
func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey string) func(map[string]string) {
callback := func(configs map[string]string) {
defer w.UpdateService()
mcpServerLog.Infof("callback, ref config changed: %s", configKey)
rule := &provider.McpServerRule{
MatchRoute: []string{routeName},
Server: &provider.ServerConfig{
Name: server.Name,
Config: map[string]interface{}{},
},
}
// process mcp credential
credentialConfig := map[string]interface{}{}
for key, data := range configs {
if strings.HasPrefix(key, provider.DefaultMcpToolsGroup) {
// skip mcp tool description
continue
}
var cred interface{}
if err := json.Unmarshal([]byte(data), &cred); err != nil {
mcpServerLog.Errorf("unmarshal credential data %v to map error:%v", key, err)
}
w.callbackMutex.Lock()
name := w.credentialKeyToName[configKey][key]
w.callbackMutex.Unlock()
credentialConfig[name] = cred
}
rule.Server.Config["credentials"] = credentialConfig
// process mcp tool description
var allowTools []string
for key, toolData := range configs {
if strings.HasPrefix(key, provider.DefaultMcpCredentialsGroup) {
// skip mcp credentials
continue
}
toolsDescription := &provider.McpToolConfig{}
if err := json.Unmarshal([]byte(toolData), toolsDescription); err != nil {
mcpServerLog.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v", err)
}
for _, t := range toolsDescription.Tools {
convertTool := &provider.McpTool{Name: t.Name, Description: t.Description}
toolMeta := toolsDescription.ToolsMeta[t.Name]
if toolMeta != nil && toolMeta.Enabled {
allowTools = append(allowTools, t.Name)
}
argsPosition, err := getArgsPositionFromToolMeta(toolMeta)
if err != nil {
mcpServerLog.Errorf("get args position from tool meta error:%v, tool name %v", err, t.Name)
}
requiredMap := sets.Set[string]{}
for _, s := range t.InputSchema.Required {
requiredMap.Insert(s)
}
for argsName, args := range t.InputSchema.Properties {
convertArgs, err := parseMcpArgs(args)
if err != nil {
mcpServerLog.Errorf("parse mcp args error:%v, tool name %v, args name %v", err, t.Name, argsName)
continue
}
convertArgs.Name = argsName
convertArgs.Required = requiredMap.Contains(argsName)
if pos, exist := argsPosition[argsName]; exist {
convertArgs.Position = pos
}
convertTool.Args = append(convertTool.Args, convertArgs)
mcpServerLog.Debugf("parseMcpArgs, toolArgs:%v", convertArgs)
}
requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta)
if err != nil {
mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name)
} else {
convertTool.RequestTemplate = requestTemplate
}
responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta)
if err != nil {
mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name)
} else {
convertTool.ResponseTemplate = responseTemplate
}
rule.Tools = append(rule.Tools, convertTool)
}
}
rule.Server.AllowTools = allowTools
wasmPluginConfig := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WasmPlugin,
Namespace: w.namespace,
},
Spec: rule,
}
w.cache.UpdateConfigCache(gvk.WasmPlugin, configKey, wasmPluginConfig, false)
}
return callback
}
func (w *watcher) buildServiceEntryForMcpServer(mcpServer *provider.McpServer, configGroup, dataId string) error {
if mcpServer == nil || mcpServer.RemoteServerConfig == nil || mcpServer.RemoteServerConfig.ServiceRef == nil {
return nil
}
mcpServerLog.Debugf("ServiceRef %v for %v", mcpServer.RemoteServerConfig.ServiceRef, dataId)
configKey := strings.Join([]string{configGroup, dataId}, DefaultJoiner)
serviceGroup := mcpServer.RemoteServerConfig.ServiceRef.GroupName
serviceNamespace := mcpServer.RemoteServerConfig.ServiceRef.NamespaceId
serviceName := mcpServer.RemoteServerConfig.ServiceRef.ServiceName
if serviceNamespace == "" {
serviceNamespace = provider.DefaultNacosServiceNamespace
}
// update config to service and unsubscribe old service
curSvcKey := strings.Join([]string{serviceNamespace, serviceGroup, serviceName}, DefaultJoiner)
if svcKey, exist := w.configToService[configKey]; exist && svcKey != curSvcKey {
split := strings.Split(svcKey, DefaultJoiner)
if svcCache, has := w.serviceCache[split[0]]; has {
if err := svcCache.RemoveListener(split[1], split[2], configKey); err != nil {
mcpServerLog.Errorf("remove listener error:%v", err)
}
}
}
w.configToService[configKey] = curSvcKey
if _, exist := w.serviceCache[serviceNamespace]; !exist {
namingConfig := constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir),
constant.WithCacheDir(DefaultNacosCacheDir),
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
MaxAge: DefaultNacosLogMaxAge,
}),
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
constant.WithNamespaceId(serviceNamespace),
constant.WithAccessKey(w.NacosAccessKey),
constant.WithSecretKey(w.NacosSecretKey),
constant.WithUsername(w.authOption.NacosUsername),
constant.WithPassword(w.authOption.NacosPassword),
)
client, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: namingConfig,
ServerConfigs: w.serverConfig,
})
if err == nil {
w.serviceCache[serviceNamespace] = NewServiceCache(client)
} else {
return fmt.Errorf("can not create naming client err:%v", err)
}
}
svcCache := w.serviceCache[serviceNamespace]
err := svcCache.AddListener(serviceGroup, serviceName, configKey, w.getServiceCallback(mcpServer, configGroup, dataId))
if err != nil {
return fmt.Errorf("add listener for dataId %v, service %s/%s error:%v", dataId, serviceGroup, serviceName, err)
}
return nil
}
func (w *watcher) getServiceCallback(server *provider.McpServer, configGroup, dataId string) func(services []model.Instance) {
groupName := server.RemoteServerConfig.ServiceRef.GroupName
if groupName == "DEFAULT_GROUP" {
groupName = "DEFAULT-GROUP"
}
namespace := server.RemoteServerConfig.ServiceRef.NamespaceId
serviceName := server.RemoteServerConfig.ServiceRef.ServiceName
path := server.RemoteServerConfig.ExportPath
protocol := server.Protocol
host := getNacosServiceFullHost(groupName, namespace, serviceName)
return func(services []model.Instance) {
defer w.UpdateService()
mcpServerLog.Infof("callback for %s/%s, serviceName : %s", configGroup, dataId, host)
configKey := strings.Join([]string{w.Name, w.NacosNamespace, configGroup, dataId}, DefaultJoiner)
if len(services) == 0 {
mcpServerLog.Errorf("callback for %s return empty service instance list, skip generate config", host)
return
}
serviceEntry := w.generateServiceEntry(host, services)
se := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedSeName, configGroup, strings.TrimSuffix(dataId, ".json")),
Namespace: "mcp",
},
Spec: serviceEntry,
}
if protocol == provider.McpSSEProtocol {
destinationRule := w.generateDrForSSEService(host)
dr := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedDrName, configGroup, strings.TrimSuffix(dataId, ".json")),
Namespace: w.namespace,
},
Spec: destinationRule,
}
w.cache.UpdateConfigCache(gvk.DestinationRule, configKey, dr, false)
}
w.cache.UpdateConfigCache(gvk.ServiceEntry, configKey, se, false)
vs := w.buildVirtualServiceForMcpServer(serviceEntry, configGroup, dataId, path, server)
w.cache.UpdateConfigCache(gvk.VirtualService, configKey, vs, false)
}
}
func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.ServiceEntry, group, dataId, path string, server *provider.McpServer) *config.Config {
if serviceentry == nil {
return nil
}
hosts := w.McpServerExportDomains
if len(hosts) == 0 {
hosts = []string{"*"}
}
var gateways []string
for _, host := range hosts {
cleanHost := common2.CleanHost(host)
// namespace/name, name format: (istio cluster id)-host
gateways = append(gateways, w.namespace+"/"+
common2.CreateConvertedName(w.clusterId, cleanHost),
common2.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost))
}
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json"))
mergePath := "/" + server.Name
if w.McpServerBaseUrl != "/" {
mergePath = strings.TrimSuffix(w.McpServerBaseUrl, "/") + mergePath
}
if path != "/" {
mergePath = mergePath + "/" + strings.TrimPrefix(path, "/")
}
vs := &v1alpha3.VirtualService{
Hosts: hosts,
Gateways: gateways,
Http: []*v1alpha3.HTTPRoute{{
Name: routeName,
Match: []*v1alpha3.HTTPMatchRequest{{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Prefix{
Prefix: mergePath,
},
},
}},
Route: []*v1alpha3.HTTPRouteDestination{{
Destination: &v1alpha3.Destination{
Host: serviceentry.Hosts[0],
Port: &v1alpha3.PortSelector{
Number: serviceentry.Ports[0].Number,
},
},
}},
}},
}
if server.Protocol == provider.McpStreambleProtocol {
vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{
Uri: path,
}
}
mcpServerLog.Debugf("construct virtualservice %v", vs)
return &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedVsName, group, dataId),
Namespace: w.namespace,
},
Spec: vs,
}
}
func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
isDnsService := false
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
protocol = common.ParseProtocol(service.Metadata["protocol"])
}
port := &v1alpha3.ServicePort{
Name: protocol.String(),
Number: uint32(service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
if !isValidIP(service.Ip) {
isDnsService = true
}
endpoint := &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, endpoint)
}
resolution := v1alpha3.ServiceEntry_STATIC
if isDnsService {
resolution = v1alpha3.ServiceEntry_DNS
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: resolution,
Endpoints: endpoints,
}
return se
}
func (w *watcher) generateDrForSSEService(host string) *v1alpha3.DestinationRule {
dr := &v1alpha3.DestinationRule{
Host: host,
TrafficPolicy: &v1alpha3.TrafficPolicy{
LoadBalancer: &v1alpha3.LoadBalancerSettings{
LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{
ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{
HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{
UseSourceIp: true,
},
},
},
},
},
}
return dr
}
func parseMcpArgs(args interface{}) (*provider.ToolArgs, error) {
argsData, err := json.Marshal(args)
if err != nil {
return nil, err
}
toolArgs := &provider.ToolArgs{}
if err = json.Unmarshal(argsData, toolArgs); err != nil {
return nil, err
}
return toolArgs, nil
}
func getArgsPositionFromToolMeta(toolMeta *provider.ToolsMeta) (map[string]string, error) {
result := map[string]string{}
if toolMeta == nil {
return result, nil
}
toolTemplate := toolMeta.Templates
for kind, meta := range toolTemplate {
switch kind {
case provider.JsonGoTemplateType:
templateData, err := json.Marshal(meta)
if err != nil {
return result, err
}
template := &provider.JsonGoTemplate{}
if err = json.Unmarshal(templateData, template); err != nil {
return result, err
}
result = mergeMaps(result, template.ArgsPosition)
default:
return result, fmt.Errorf("unsupport tool meta type %v", kind)
}
}
return result, nil
}
func getRequestTemplateFromToolMeta(toolMeta *provider.ToolsMeta) (*provider.RequestTemplate, error) {
if toolMeta == nil {
return nil, nil
}
toolTemplate := toolMeta.Templates
for kind, meta := range toolTemplate {
switch kind {
case provider.JsonGoTemplateType:
templateData, err := json.Marshal(meta)
if err != nil {
return nil, err
}
template := &provider.JsonGoTemplate{}
if err = json.Unmarshal(templateData, template); err != nil {
return nil, err
}
return &template.RequestTemplate, nil
default:
return nil, fmt.Errorf("unsupport tool meta type")
}
}
return nil, nil
}
func getResponseTemplateFromToolMeta(toolMeta *provider.ToolsMeta) (*provider.ResponseTemplate, error) {
if toolMeta == nil {
return nil, nil
}
toolTemplate := toolMeta.Templates
for kind, meta := range toolTemplate {
switch kind {
case provider.JsonGoTemplateType:
templateData, err := json.Marshal(meta)
if err != nil {
return nil, err
}
template := &provider.JsonGoTemplate{}
if err = json.Unmarshal(templateData, template); err != nil {
return nil, err
}
return &template.ResponseTemplate, nil
default:
return nil, fmt.Errorf("unsupport tool meta type")
}
}
return nil, nil
}
func mergeMaps(maps ...map[string]string) map[string]string {
if len(maps) == 0 {
return nil
}
res := make(map[string]string, len(maps[0]))
for _, m := range maps {
for k, v := range m {
res[k] = v
}
}
return res
}
func getNacosServiceFullHost(groupName, namespace, serviceName string) string {
suffix := strings.Join([]string{groupName, namespace, string(provider.Nacos)}, common.DotSeparator)
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
return host
}
func (w *watcher) Stop() {
w.mutex.Lock()
defer w.mutex.Unlock()
mcpServerLog.Infof("unsubscribe all configs")
for key := range w.watchingConfig {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err == nil {
delete(w.watchingConfig, key)
}
}
mcpServerLog.Infof("stop all service nameing client")
for _, client := range w.serviceCache {
// TODO: This is a temporary implementation because of a bug in the nacos-go-sdk, which causes a block when stoping.
go client.Stop()
}
w.isStop = true
mcpServerLog.Infof("stop all config client")
mcpServerLog.Infof("watcher %v stop", w.Name)
close(w.stop)
w.Ready(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func isValidIP(ipStr string) bool {
ip := net.ParseIP(ipStr)
return ip != nil
}