registry/nacos/mcpserver/util.go (139 lines of code) (raw):

// Copyright (c) 2022 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package mcpserver import ( "fmt" "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/vo" ) type MultiConfigListener struct { configClient config_client.IConfigClient onChange func(map[string]string) configCache map[string]string innerCallback func(string, string, string, string) } func NewMultiConfigListener(configClient config_client.IConfigClient, onChange func(map[string]string)) *MultiConfigListener { result := &MultiConfigListener{ configClient: configClient, configCache: make(map[string]string), onChange: onChange, } result.innerCallback = func(namespace string, group string, dataId string, content string) { result.configCache[group+DefaultJoiner+dataId] = content result.onChange(result.configCache) } return result } func (l *MultiConfigListener) StartListen(configs []vo.ConfigParam) error { for _, config := range configs { content, err := l.configClient.GetConfig(vo.ConfigParam{ DataId: config.DataId, Group: config.Group, }) if err != nil { return fmt.Errorf("get config %s/%s err: %v", config.Group, config.DataId, err) } l.configCache[config.Group+DefaultJoiner+config.DataId] = content err = l.configClient.ListenConfig(vo.ConfigParam{ DataId: config.DataId, Group: config.Group, OnChange: l.innerCallback, }) if err != nil { return fmt.Errorf("listener to config %s/%s error: %w", config.Group, config.DataId, err) } } l.onChange(l.configCache) return nil } func (l *MultiConfigListener) Stop() { l.configClient.CloseClient() } func (l *MultiConfigListener) CancelListen(configs []vo.ConfigParam) error { for _, config := range configs { if _, ok := l.configCache[config.Group+DefaultJoiner+config.DataId]; ok { err := l.configClient.CancelListenConfig(vo.ConfigParam{ DataId: config.DataId, Group: config.Group, }) if err != nil { return fmt.Errorf("cancel config %s/%s error: %w", config.Group, config.DataId, err) } delete(l.configCache, config.Group+config.DataId) } } return nil } type ServiceCache struct { services map[string]*NacosServiceRef client naming_client.INamingClient } type NacosServiceRef struct { refs map[string]func([]model.Instance) callback func(services []model.Instance, err error) instances *[]model.Instance } func NewServiceCache(client naming_client.INamingClient) *ServiceCache { return &ServiceCache{ client: client, services: make(map[string]*NacosServiceRef), } } func (c *ServiceCache) AddListener(group string, serviceName string, key string, callback func([]model.Instance)) error { uniqueServiceName := c.makeServiceUniqueName(group, serviceName) if _, ok := c.services[uniqueServiceName]; !ok { instances, err := c.client.SelectAllInstances(vo.SelectAllInstancesParam{ GroupName: group, ServiceName: serviceName, }) if err != nil { return err } ref := &NacosServiceRef{ refs: map[string]func([]model.Instance){}, instances: &instances, } ref.callback = func(services []model.Instance, err error) { ref.instances = &services for _, refCallback := range ref.refs { refCallback(*ref.instances) } } c.services[uniqueServiceName] = ref err = c.client.Subscribe(&vo.SubscribeParam{ GroupName: group, ServiceName: serviceName, SubscribeCallback: ref.callback, }) if err != nil { return err } } ref := c.services[uniqueServiceName] ref.refs[key] = callback callback(*ref.instances) return nil } func (c *ServiceCache) RemoveListener(group string, serviceName string, key string) error { if ref, ok := c.services[c.makeServiceUniqueName(group, serviceName)]; ok { delete(ref.refs, key) if len(ref.refs) == 0 { err := c.client.Unsubscribe(&vo.SubscribeParam{ GroupName: group, ServiceName: serviceName, SubscribeCallback: ref.callback, }) delete(c.services, c.makeServiceUniqueName(group, serviceName)) if err != nil { return err } } } return nil } func (c *ServiceCache) makeServiceUniqueName(group string, serviceName string) string { return fmt.Sprintf("%s-%s", group, serviceName) } func (c *ServiceCache) Stop() { c.client.CloseClient() }