registry/servicediscovery/service_discovery_registry.go (306 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package servicediscovery
import (
"bytes"
"strings"
"sync"
"time"
)
import (
gxset "github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/log/logger"
perrors "github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
"dubbo.apache.org/dubbo-go/v3/metrics"
metricsMetadata "dubbo.apache.org/dubbo-go/v3/metrics/metadata"
metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry"
"dubbo.apache.org/dubbo-go/v3/registry"
_ "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/customizer"
"dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer"
)
func init() {
extension.SetRegistry(constant.ServiceRegistryProtocol, newServiceDiscoveryRegistry)
}
// serviceDiscoveryRegistry is the implementation of application-level registry.
// It's completely different from other registry implementations
// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping and metadata
// In order to keep compatible with interface-level registry,
// serviceDiscoveryRegistry = ServiceDiscovery + metadata
type serviceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
serviceDiscovery registry.ServiceDiscovery
instance registry.ServiceInstance
serviceNameMapping mapping.ServiceNameMapping
metadataReport report.MetadataReport
serviceListeners map[string]registry.ServiceInstancesChangedListener
serviceMappingListeners map[string]mapping.MappingListener
}
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
serviceDiscovery, err := extension.GetServiceDiscovery(url)
if err != nil {
return nil, perrors.WithMessage(err, "Create service discovery failed")
}
return &serviceDiscoveryRegistry{
url: url,
serviceDiscovery: serviceDiscovery,
serviceNameMapping: extension.GetGlobalServiceNameMapping(),
metadataReport: metadata.GetMetadataReportByRegistry(url.GetParam(constant.RegistryIdKey, "")),
serviceListeners: make(map[string]registry.ServiceInstancesChangedListener),
// cache for mapping listener
serviceMappingListeners: make(map[string]mapping.MappingListener),
}, nil
}
func (s *serviceDiscoveryRegistry) RegisterService() error {
metaInfo := metadata.GetMetadataInfo(s.url.GetParam(constant.RegistryIdKey, ""))
if metaInfo == nil {
panic("no metada info found of registry id " + s.url.GetParam(constant.RegistryIdKey, ""))
}
s.instance = createInstance(metaInfo)
// consumer has no host and port, so it will not register service
if s.instance.GetHost() != "" && s.instance.GetPort() != 0 {
metaInfo.CalAndGetRevision()
if metadata.GetMetadataType() == constant.RemoteMetadataStorageType {
if s.metadataReport == nil {
return perrors.New("can not publish app metadata cause report instance not found")
}
err := s.metadataReport.PublishAppMetadata(metaInfo.App, metaInfo.Revision, metaInfo)
if err != nil {
return err
}
}
return s.serviceDiscovery.Register(s.instance)
}
return nil
}
func createInstance(meta *info.MetadataInfo) registry.ServiceInstance {
params := make(map[string]string, 8)
params[constant.MetadataStorageTypePropertyName] = metadata.GetMetadataType()
instance := ®istry.DefaultServiceInstance{
ServiceName: meta.App,
Enable: true,
Healthy: true,
Metadata: params,
ServiceMetadata: meta,
Tag: meta.Tag,
}
for _, cus := range extension.GetCustomizers() {
cus.Customize(instance)
}
return instance
}
func (s *serviceDiscoveryRegistry) UnRegisterService() error {
return s.serviceDiscovery.Unregister(s.instance)
}
func (s *serviceDiscoveryRegistry) UnRegister(url *common.URL) error {
if !shouldRegister(url) {
return nil
}
return nil
}
func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
if !shouldSubscribe(url) {
return nil
}
services := s.getServices(url, nil)
if services == nil {
return nil
}
// FIXME ServiceNames.String() is not good
serviceNamesKey := services.String()
l := s.serviceListeners[serviceNamesKey]
l.RemoveListener(url.ServiceKey())
s.stopListen(url)
err := s.serviceNameMapping.Remove(url)
if err != nil {
return err
}
return nil
}
func parseServices(literalServices string) *gxset.HashSet {
set := gxset.NewSet()
if len(literalServices) == 0 {
return set
}
var splitServices = strings.Split(literalServices, ",")
for _, s := range splitServices {
if len(s) != 0 {
set.Add(s)
}
}
return set
}
func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
return s.serviceDiscovery
}
func (s *serviceDiscoveryRegistry) GetURL() *common.URL {
return s.url
}
func (s *serviceDiscoveryRegistry) IsAvailable() bool {
if s.serviceDiscovery.GetServices() == nil {
return false
}
return len(s.serviceDiscovery.GetServices().Values()) > 0
}
func (s *serviceDiscoveryRegistry) Destroy() {
err := s.serviceDiscovery.Destroy()
if err != nil {
logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
}
}
func (s *serviceDiscoveryRegistry) Register(url *common.URL) error {
if !shouldRegister(url) {
return nil
}
common.HandleRegisterIPAndPort(url)
if id, exist := s.url.GetNonDefaultParam(constant.RegistryIdKey); exist {
metadata.AddService(id, url)
}
metrics.Publish(metricsRegistry.NewServerRegisterEvent(true, time.Now()))
return s.serviceNameMapping.Map(url)
}
func shouldRegister(url *common.URL) bool {
side := url.GetParam(constant.SideKey, "")
if side == constant.SideProvider {
return true
}
logger.Debugf("The URL should not be register.", url.String())
return false
}
func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
if !shouldSubscribe(url) {
logger.Infof("Service %s is set to not subscribe to instances.", url.ServiceKey())
return nil
}
if id, exist := s.url.GetNonDefaultParam(constant.RegistryIdKey); exist {
metadata.AddSubscribeURL(id, url)
}
mappingListener := NewMappingListener(s.url, url, parseServices(url.GetParam(constant.ProvidedBy, "")), notify)
services := s.getServices(url, mappingListener)
if services.Empty() {
logger.Infof("Should has at least one way to know which services this interface belongs to,"+
" either specify 'provided-by' for reference or enable metadata-report center subscription url:%s", url.String())
} else {
logger.Infof("Find initial mapping applications %q for service %s.", services, url.ServiceKey())
// first notify
err := mappingListener.OnEvent(registry.NewServiceMappingChangedEvent(url.ServiceKey(), services))
if err != nil {
logger.Errorf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListenerImpl handle error:%v", err)
}
}
return nil
}
func (s *serviceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry.NotifyListener, services *gxset.HashSet) {
// FIXME ServiceNames.String() is not good
var err error
serviceNamesKey := services.String()
protocol := constant.TriProtocol // consume "tri" protocol by default, other protocols need to be specified on reference/consumer explicitly
if url.Protocol != "" {
protocol = url.Protocol
}
protocolServiceKey := url.ServiceKey() + ":" + protocol
listener := s.serviceListeners[serviceNamesKey]
if listener == nil {
listener = NewServiceInstancesChangedListener(url.GetParam(constant.ApplicationKey, ""), services)
for _, serviceNameTmp := range services.Values() {
serviceName := serviceNameTmp.(string)
instances := s.serviceDiscovery.GetInstances(serviceName)
logger.Infof("Synchronized instance notification on application %s subscription, instance list size %s", serviceName, len(instances))
err = listener.OnEvent(®istry.ServiceInstancesChangedEvent{
ServiceName: serviceName,
Instances: instances,
})
if err != nil {
logger.Warnf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListenerImpl handle error:%v", err)
}
}
}
s.serviceListeners[serviceNamesKey] = listener
listener.AddListenerAndNotify(protocolServiceKey, notify)
event := metricsMetadata.NewMetadataMetricTimeEvent(metricsMetadata.SubscribeServiceRt)
logger.Infof("Start subscribing to registry for applications :%s with a new go routine.", serviceNamesKey)
go func() {
err = s.serviceDiscovery.AddListener(listener)
event.Succ = err != nil
event.End = time.Now()
event.Attachment[constant.InterfaceKey] = url.Interface()
metrics.Publish(event)
metrics.Publish(metricsRegistry.NewServerSubscribeEvent(err == nil))
if err != nil {
logger.Errorf("add instance listener catch error,url:%s err:%s", url.String(), err.Error())
}
}()
}
// LoadSubscribeInstances load subscribe instance
func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error {
return nil
}
func getUrlKey(url *common.URL) string {
var bf bytes.Buffer
if len(url.Protocol) != 0 {
bf.WriteString(url.Protocol)
bf.WriteString("://")
}
if len(url.Location) != 0 {
bf.WriteString(url.Location)
bf.WriteString(":")
bf.WriteString(url.Port)
}
if len(url.Path) != 0 {
bf.WriteString("/")
bf.WriteString(url.Path)
}
bf.WriteString("?")
appendParam(bf, constant.VersionKey, url)
appendParam(bf, constant.GroupKey, url)
appendParam(bf, constant.NacosProtocolKey, url)
return bf.String()
}
func appendParam(buffer bytes.Buffer, paramKey string, url *common.URL) {
buffer.WriteString(paramKey)
buffer.WriteString("=")
buffer.WriteString(url.GetParam(paramKey, ""))
}
func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []*common.URL {
var urls []*common.URL
for _, syn := range synthesizer.GetAllSynthesizer() {
if syn.Support(subscribedURL) {
urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
}
}
return urls
}
func shouldSubscribe(url *common.URL) bool {
return !shouldRegister(url)
}
func (s *serviceDiscoveryRegistry) getServices(url *common.URL, listener mapping.MappingListener) *gxset.HashSet {
services := gxset.NewSet()
serviceNames := url.GetParam(constant.ProvidedBy, "")
if len(serviceNames) > 0 {
services = parseServices(serviceNames)
}
if services.Empty() {
services = s.findMappedServices(url, listener)
}
return services
}
func (s *serviceDiscoveryRegistry) findMappedServices(url *common.URL, listener mapping.MappingListener) *gxset.HashSet {
serviceNames, err := s.serviceNameMapping.Get(url, listener)
if err != nil {
logger.Errorf("get service names catch error, url:%s, err:%s ", url.String(), err.Error())
return gxset.NewSet()
}
if listener != nil {
protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
s.lock.Lock()
s.serviceMappingListeners[protocolServiceKey] = listener
s.lock.Unlock()
}
return serviceNames
}
func (s *serviceDiscoveryRegistry) stopListen(url *common.URL) {
protocolServiceKey := url.ServiceKey() + ":" + url.Protocol
s.lock.Lock()
listener := s.serviceMappingListeners[protocolServiceKey]
if listener != nil {
delete(s.serviceMappingListeners, protocolServiceKey)
listener.Stop()
}
s.lock.Unlock()
}