in registry/reconcile/reconcile.go [150:285]
func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) {
var watcher Watcher
var err error
authOption, err := r.getAuthOption(registry)
if err != nil {
return nil, err
}
switch registry.Type {
case string(Nacos):
watcher, err = nacos.NewWatcher(
r.Cache,
nacos.WithType(registry.Type),
nacos.WithName(registry.Name),
nacos.WithDomain(registry.Domain),
nacos.WithPort(registry.Port),
nacos.WithNacosNamespaceId(registry.NacosNamespaceId),
nacos.WithNacosNamespace(registry.NacosNamespace),
nacos.WithNacosGroups(registry.NacosGroups),
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacos.WithAuthOption(authOption),
)
case string(Nacos2):
watcher, err = nacosv2.NewWatcher(
r.Cache,
nacosv2.WithType(registry.Type),
nacosv2.WithName(registry.Name),
nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
nacosv2.WithDomain(registry.Domain),
nacosv2.WithPort(registry.Port),
nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacosv2.WithAuthOption(authOption),
)
case string(Nacos3):
if registry.EnableMCPServer.GetValue() {
watcher, err = mcpserver.NewWatcher(
r.Cache,
mcpserver.WithType(registry.Type),
mcpserver.WithName(registry.Name),
mcpserver.WithNacosAddressServer(registry.NacosAddressServer),
mcpserver.WithDomain(registry.Domain),
mcpserver.WithPort(registry.Port),
mcpserver.WithNacosAccessKey(registry.NacosAccessKey),
mcpserver.WithNacosSecretKey(registry.NacosSecretKey),
mcpserver.WithNacosRefreshInterval(registry.NacosRefreshInterval),
mcpserver.WithMcpExportDomains(registry.McpServerExportDomains),
mcpserver.WithMcpBaseUrl(registry.McpServerBaseUrl),
mcpserver.WithEnableMcpServer(registry.EnableMCPServer),
mcpserver.WithClusterId(r.clusterId),
mcpserver.WithNamespace(r.namespace),
mcpserver.WithAuthOption(authOption),
)
} else {
watcher, err = nacosv2.NewWatcher(
r.Cache,
nacosv2.WithType(registry.Type),
nacosv2.WithName(registry.Name),
nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
nacosv2.WithDomain(registry.Domain),
nacosv2.WithPort(registry.Port),
nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacosv2.WithAuthOption(authOption),
)
}
case string(Zookeeper):
watcher, err = zookeeper.NewWatcher(
r.Cache,
zookeeper.WithType(registry.Type),
zookeeper.WithName(registry.Name),
zookeeper.WithDomain(registry.Domain),
zookeeper.WithPort(registry.Port),
zookeeper.WithZkServicesPath(registry.ZkServicesPath),
)
case string(Consul):
watcher, err = consul.NewWatcher(
r.Cache,
consul.WithType(registry.Type),
consul.WithName(registry.Name),
consul.WithDomain(registry.Domain),
consul.WithPort(registry.Port),
consul.WithDatacenter(registry.ConsulDatacenter),
consul.WithServiceTag(registry.ConsulServiceTag),
consul.WithRefreshInterval(registry.ConsulRefreshInterval),
consul.WithAuthOption(authOption),
)
case string(Static), string(DNS):
watcher, err = direct.NewWatcher(
r.Cache,
direct.WithType(registry.Type),
direct.WithName(registry.Name),
direct.WithDomain(registry.Domain),
direct.WithPort(registry.Port),
direct.WithProtocol(registry.Protocol),
direct.WithSNI(registry.Sni),
)
case string(Eureka):
watcher, err = eureka.NewWatcher(
r.Cache,
eureka.WithName(registry.Name),
eureka.WithDomain(registry.Domain),
eureka.WithType(registry.Type),
eureka.WithPort(registry.Port),
)
default:
return nil, errors.New("unsupported registry type:" + registry.Type)
}
if err != nil {
return nil, err
}
wg.Add(1)
var once sync.Once
watcher.ReadyHandler(func(ready bool) {
once.Do(func() {
wg.Done()
if ready {
log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name)
}
})
})
watcher.AppendServiceUpdateHandler(r.serviceUpdate)
return watcher, nil
}