private synchronized void doOnEvent()

in dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java [131:230]


    private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
        if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
            return;
        }

        refreshInstance(event);

        if (logger.isDebugEnabled()) {
            logger.debug(event.getServiceInstances().toString());
        }

        Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
        Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();

        // grouping all instances of this app(service name) by revision
        for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
            List<ServiceInstance> instances = entry.getValue();
            for (ServiceInstance instance : instances) {
                String revision = getExportedServicesRevision(instance);
                if (revision == null || EMPTY_REVISION.equals(revision)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Find instance without valid service metadata: " + instance.getAddress());
                    }
                    continue;
                }
                List<ServiceInstance> subInstances =
                        revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
                subInstances.add(instance);
            }
        }

        // get MetadataInfo with revision
        for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
            String revision = entry.getKey();
            List<ServiceInstance> subInstances = entry.getValue();

            MetadataInfo metadata = subInstances.stream()
                    .map(ServiceInstance::getServiceMetadata)
                    .filter(Objects::nonNull)
                    .filter(m -> revision.equals(m.getRevision()))
                    .findFirst()
                    .orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances));

            parseMetadata(revision, metadata, localServiceToRevisions);
            // update metadata into each instance, in case new instance created.
            for (ServiceInstance tmpInstance : subInstances) {
                MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
                if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
                    tmpInstance.setServiceMetadata(metadata);
                }
            }
        }

        int emptyNum = hasEmptyMetadata(revisionToInstances);
        if (emptyNum != 0) {
            hasEmptyMetadata = true;

            // return if all metadata is empty, this notification will not take effect.
            if (emptyNum == revisionToInstances.size()) {
                // 1-17 - Address refresh failed.
                logger.error(
                        REGISTRY_FAILED_REFRESH_ADDRESS,
                        "metadata Server failure",
                        "",
                        "Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");

                submitRetryTask(event);
                return;
            }
        } else {
            hasEmptyMetadata = false;
        }

        Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
        Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
        for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
            ServiceInfo serviceInfo = entry.getKey();
            Set<String> revisions = entry.getValue();

            Map<Integer, Map<Set<String>, Object>> portToRevisions =
                    protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
            Map<Set<String>, Object> revisionsToUrls =
                    portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
            Object urls = revisionsToUrls.computeIfAbsent(
                    revisions,
                    k -> getServiceUrlsCache(
                            revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort()));

            List<ProtocolServiceKeyWithUrls> list =
                    newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
            list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
        }

        this.serviceUrls = newServiceUrls;
        this.notifyAddressChanged();

        if (hasEmptyMetadata) {
            submitRetryTask(event);
        }
    }