func()

in registry/reconcile/reconcile.go [150:285]


func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) {
	var watcher Watcher
	var err error

	authOption, err := r.getAuthOption(registry)
	if err != nil {
		return nil, err
	}

	switch registry.Type {
	case string(Nacos):
		watcher, err = nacos.NewWatcher(
			r.Cache,
			nacos.WithType(registry.Type),
			nacos.WithName(registry.Name),
			nacos.WithDomain(registry.Domain),
			nacos.WithPort(registry.Port),
			nacos.WithNacosNamespaceId(registry.NacosNamespaceId),
			nacos.WithNacosNamespace(registry.NacosNamespace),
			nacos.WithNacosGroups(registry.NacosGroups),
			nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
			nacos.WithAuthOption(authOption),
		)
	case string(Nacos2):
		watcher, err = nacosv2.NewWatcher(
			r.Cache,
			nacosv2.WithType(registry.Type),
			nacosv2.WithName(registry.Name),
			nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
			nacosv2.WithDomain(registry.Domain),
			nacosv2.WithPort(registry.Port),
			nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
			nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
			nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
			nacosv2.WithNacosNamespace(registry.NacosNamespace),
			nacosv2.WithNacosGroups(registry.NacosGroups),
			nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
			nacosv2.WithAuthOption(authOption),
		)
	case string(Nacos3):
		if registry.EnableMCPServer.GetValue() {
			watcher, err = mcpserver.NewWatcher(
				r.Cache,
				mcpserver.WithType(registry.Type),
				mcpserver.WithName(registry.Name),
				mcpserver.WithNacosAddressServer(registry.NacosAddressServer),
				mcpserver.WithDomain(registry.Domain),
				mcpserver.WithPort(registry.Port),
				mcpserver.WithNacosAccessKey(registry.NacosAccessKey),
				mcpserver.WithNacosSecretKey(registry.NacosSecretKey),
				mcpserver.WithNacosRefreshInterval(registry.NacosRefreshInterval),
				mcpserver.WithMcpExportDomains(registry.McpServerExportDomains),
				mcpserver.WithMcpBaseUrl(registry.McpServerBaseUrl),
				mcpserver.WithEnableMcpServer(registry.EnableMCPServer),
				mcpserver.WithClusterId(r.clusterId),
				mcpserver.WithNamespace(r.namespace),
				mcpserver.WithAuthOption(authOption),
			)
		} else {
			watcher, err = nacosv2.NewWatcher(
				r.Cache,
				nacosv2.WithType(registry.Type),
				nacosv2.WithName(registry.Name),
				nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
				nacosv2.WithDomain(registry.Domain),
				nacosv2.WithPort(registry.Port),
				nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
				nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
				nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
				nacosv2.WithNacosNamespace(registry.NacosNamespace),
				nacosv2.WithNacosGroups(registry.NacosGroups),
				nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
				nacosv2.WithAuthOption(authOption),
			)
		}
	case string(Zookeeper):
		watcher, err = zookeeper.NewWatcher(
			r.Cache,
			zookeeper.WithType(registry.Type),
			zookeeper.WithName(registry.Name),
			zookeeper.WithDomain(registry.Domain),
			zookeeper.WithPort(registry.Port),
			zookeeper.WithZkServicesPath(registry.ZkServicesPath),
		)
	case string(Consul):
		watcher, err = consul.NewWatcher(
			r.Cache,
			consul.WithType(registry.Type),
			consul.WithName(registry.Name),
			consul.WithDomain(registry.Domain),
			consul.WithPort(registry.Port),
			consul.WithDatacenter(registry.ConsulDatacenter),
			consul.WithServiceTag(registry.ConsulServiceTag),
			consul.WithRefreshInterval(registry.ConsulRefreshInterval),
			consul.WithAuthOption(authOption),
		)
	case string(Static), string(DNS):
		watcher, err = direct.NewWatcher(
			r.Cache,
			direct.WithType(registry.Type),
			direct.WithName(registry.Name),
			direct.WithDomain(registry.Domain),
			direct.WithPort(registry.Port),
			direct.WithProtocol(registry.Protocol),
			direct.WithSNI(registry.Sni),
		)
	case string(Eureka):
		watcher, err = eureka.NewWatcher(
			r.Cache,
			eureka.WithName(registry.Name),
			eureka.WithDomain(registry.Domain),
			eureka.WithType(registry.Type),
			eureka.WithPort(registry.Port),
		)
	default:
		return nil, errors.New("unsupported registry type:" + registry.Type)
	}

	if err != nil {
		return nil, err
	}

	wg.Add(1)
	var once sync.Once
	watcher.ReadyHandler(func(ready bool) {
		once.Do(func() {
			wg.Done()
			if ready {
				log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name)
			}
		})
	})
	watcher.AppendServiceUpdateHandler(r.serviceUpdate)

	return watcher, nil
}