in pkg/core/registry/service_instances_changed_listener_impl.go [61:169]
func (lstn *DubboSDNotifyListener) OnEvent(e observer.Event) error {
ce, ok := e.(*registry.ServiceInstancesChangedEvent)
if !ok {
return nil
}
var err error
revisionToInstances := make(map[string][]registry.ServiceInstance)
localServiceToRevisions := make(map[*info.ServiceInfo]*gxset.HashSet)
protocolRevisionsToUrls := make(map[string]map[*gxset.HashSet][]*common.URL)
newServiceURLs := make(map[string][]*common.URL)
logger.Infof("Received instance notification event of service %s, instance list size %d", ce.ServiceName, len(ce.Instances))
for _, instance := range ce.Instances {
oldRevision := lstn.ctx.GetOldRevision(instance)
if instance.GetMetadata() == nil {
logger.Warnf("Instance metadata is nil: %s", instance.GetHost())
continue
}
revision := instance.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
if "0" == revision {
logger.Infof("Find instance without valid service metadata: %s", instance.GetHost())
continue
}
subInstances := revisionToInstances[revision]
if subInstances == nil {
subInstances = make([]registry.ServiceInstance, 8)
}
revisionToInstances[revision] = append(subInstances, instance)
metadataInfo := lstn.ctx.GetRevisionToMetadata(revision)
if metadataInfo == nil {
logger.Infof("Start to fetch metadata from remote for app %s instance %s with revision %s ......", instance.GetServiceName(), instance.GetAddress(), revision)
metadataInfo, err = GetMetadataInfo(instance, revision)
if err != nil {
logger.Errorf("Fetch metadata from remote error for revision %s, error detail is %v", revision, err)
return err
}
}
instance.SetServiceMetadata(metadataInfo)
for _, service := range metadataInfo.Services {
if localServiceToRevisions[service] == nil {
localServiceToRevisions[service] = gxset.NewSet()
}
localServiceToRevisions[service].Add(revision)
}
lstn.ctx.UpdateRevisionToMetadata(oldRevision, revision, metadataInfo)
}
lstn.ctx.AddAllInstances(ce.ServiceName, ce.Instances)
for serviceInfo, revisions := range localServiceToRevisions {
revisionsToUrls := protocolRevisionsToUrls[serviceInfo.Protocol]
if revisionsToUrls == nil {
protocolRevisionsToUrls[serviceInfo.Protocol] = make(map[*gxset.HashSet][]*common.URL)
revisionsToUrls = protocolRevisionsToUrls[serviceInfo.Protocol]
}
urls := revisionsToUrls[revisions]
if urls != nil {
newServiceURLs[serviceInfo.Name] = urls
} else {
urls = make([]*common.URL, 0, 8)
for _, v := range revisions.Values() {
r := v.(string)
for _, i := range revisionToInstances[r] {
if i != nil {
urls = append(urls, i.ToURLs(serviceInfo)...)
}
}
}
revisionsToUrls[revisions] = urls
newServiceURLs[serviceInfo.Name] = urls
}
lstn.ctx.AddServiceUrls(newServiceURLs)
}
for key, notifyListener := range lstn.listeners {
urls := lstn.ctx.GetServiceUrls()[key]
events := make([]*registry.ServiceEvent, 0, len(urls))
for _, url := range urls {
url.SetParam(consts.RegistryType, consts.RegistryInstance)
events = append(events, ®istry.ServiceEvent{
Action: remoting.EventTypeAdd,
Service: url,
})
}
notifyListener.NotifyAll(events, func() {})
}
for _, instance := range findInstancesToDelete(lstn.localAllInstances[ce.ServiceName], ce.Instances) {
revision := instance.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
metadataInfo := lstn.ctx.GetRevisionToMetadata(revision)
for _, v := range metadataInfo.Services {
notifyListener := lstn.listeners[v.Name]
for _, url := range instance.ToURLs(v) {
lstn.ctx.DeleteServiceUrl(v.Name, url)
notifyListener.Notify(®istry.ServiceEvent{
Action: remoting.EventTypeDel,
Service: url,
})
}
}
lstn.ctx.DeleteRevisionToMetadata(revision)
lstn.ctx.DeleteAllInstance(ce.ServiceName, instance)
}
lstn.localAllInstances[ce.ServiceName] = ce.Instances
return nil
}