func()

in pkg/core/registry/service_instances_changed_listener_impl.go [61:169]


func (lstn *DubboSDNotifyListener) OnEvent(e observer.Event) error {
	ce, ok := e.(*registry.ServiceInstancesChangedEvent)
	if !ok {
		return nil
	}
	var err error

	revisionToInstances := make(map[string][]registry.ServiceInstance)
	localServiceToRevisions := make(map[*info.ServiceInfo]*gxset.HashSet)
	protocolRevisionsToUrls := make(map[string]map[*gxset.HashSet][]*common.URL)
	newServiceURLs := make(map[string][]*common.URL)

	logger.Infof("Received instance notification event of service %s, instance list size %d", ce.ServiceName, len(ce.Instances))

	for _, instance := range ce.Instances {
		oldRevision := lstn.ctx.GetOldRevision(instance)
		if instance.GetMetadata() == nil {
			logger.Warnf("Instance metadata is nil: %s", instance.GetHost())
			continue
		}
		revision := instance.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
		if "0" == revision {
			logger.Infof("Find instance without valid service metadata: %s", instance.GetHost())
			continue
		}
		subInstances := revisionToInstances[revision]
		if subInstances == nil {
			subInstances = make([]registry.ServiceInstance, 8)
		}
		revisionToInstances[revision] = append(subInstances, instance)

		metadataInfo := lstn.ctx.GetRevisionToMetadata(revision)
		if metadataInfo == nil {
			logger.Infof("Start to fetch metadata from remote for app %s instance %s with revision %s ......", instance.GetServiceName(), instance.GetAddress(), revision)
			metadataInfo, err = GetMetadataInfo(instance, revision)
			if err != nil {
				logger.Errorf("Fetch metadata from remote error for revision %s, error detail is %v", revision, err)
				return err
			}
		}
		instance.SetServiceMetadata(metadataInfo)
		for _, service := range metadataInfo.Services {
			if localServiceToRevisions[service] == nil {
				localServiceToRevisions[service] = gxset.NewSet()
			}
			localServiceToRevisions[service].Add(revision)
		}
		lstn.ctx.UpdateRevisionToMetadata(oldRevision, revision, metadataInfo)
	}
	lstn.ctx.AddAllInstances(ce.ServiceName, ce.Instances)

	for serviceInfo, revisions := range localServiceToRevisions {
		revisionsToUrls := protocolRevisionsToUrls[serviceInfo.Protocol]
		if revisionsToUrls == nil {
			protocolRevisionsToUrls[serviceInfo.Protocol] = make(map[*gxset.HashSet][]*common.URL)
			revisionsToUrls = protocolRevisionsToUrls[serviceInfo.Protocol]
		}
		urls := revisionsToUrls[revisions]
		if urls != nil {
			newServiceURLs[serviceInfo.Name] = urls
		} else {
			urls = make([]*common.URL, 0, 8)
			for _, v := range revisions.Values() {
				r := v.(string)
				for _, i := range revisionToInstances[r] {
					if i != nil {
						urls = append(urls, i.ToURLs(serviceInfo)...)
					}
				}
			}
			revisionsToUrls[revisions] = urls
			newServiceURLs[serviceInfo.Name] = urls
		}
		lstn.ctx.AddServiceUrls(newServiceURLs)
	}

	for key, notifyListener := range lstn.listeners {
		urls := lstn.ctx.GetServiceUrls()[key]
		events := make([]*registry.ServiceEvent, 0, len(urls))
		for _, url := range urls {
			url.SetParam(consts.RegistryType, consts.RegistryInstance)
			events = append(events, &registry.ServiceEvent{
				Action:  remoting.EventTypeAdd,
				Service: url,
			})
		}
		notifyListener.NotifyAll(events, func() {})
	}

	for _, instance := range findInstancesToDelete(lstn.localAllInstances[ce.ServiceName], ce.Instances) {
		revision := instance.GetMetadata()[dubboconstant.ExportedServicesRevisionPropertyName]
		metadataInfo := lstn.ctx.GetRevisionToMetadata(revision)
		for _, v := range metadataInfo.Services {
			notifyListener := lstn.listeners[v.Name]
			for _, url := range instance.ToURLs(v) {
				lstn.ctx.DeleteServiceUrl(v.Name, url)
				notifyListener.Notify(&registry.ServiceEvent{
					Action:  remoting.EventTypeDel,
					Service: url,
				})
			}
		}
		lstn.ctx.DeleteRevisionToMetadata(revision)
		lstn.ctx.DeleteAllInstance(ce.ServiceName, instance)
	}

	lstn.localAllInstances[ce.ServiceName] = ce.Instances
	return nil
}