in dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java [131:230]
private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
refreshInstance(event);
if (logger.isDebugEnabled()) {
logger.debug(event.getServiceInstances().toString());
}
Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>();
// grouping all instances of this app(service name) by revision
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
String revision = getExportedServicesRevision(instance);
if (revision == null || EMPTY_REVISION.equals(revision)) {
if (logger.isDebugEnabled()) {
logger.debug("Find instance without valid service metadata: " + instance.getAddress());
}
continue;
}
List<ServiceInstance> subInstances =
revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
}
}
// get MetadataInfo with revision
for (Map.Entry<String, List<ServiceInstance>> entry : revisionToInstances.entrySet()) {
String revision = entry.getKey();
List<ServiceInstance> subInstances = entry.getValue();
MetadataInfo metadata = subInstances.stream()
.map(ServiceInstance::getServiceMetadata)
.filter(Objects::nonNull)
.filter(m -> revision.equals(m.getRevision()))
.findFirst()
.orElseGet(() -> serviceDiscovery.getRemoteMetadata(revision, subInstances));
parseMetadata(revision, metadata, localServiceToRevisions);
// update metadata into each instance, in case new instance created.
for (ServiceInstance tmpInstance : subInstances) {
MetadataInfo originMetadata = tmpInstance.getServiceMetadata();
if (originMetadata == null || !Objects.equals(originMetadata.getRevision(), metadata.getRevision())) {
tmpInstance.setServiceMetadata(metadata);
}
}
}
int emptyNum = hasEmptyMetadata(revisionToInstances);
if (emptyNum != 0) {
hasEmptyMetadata = true;
// return if all metadata is empty, this notification will not take effect.
if (emptyNum == revisionToInstances.size()) {
// 1-17 - Address refresh failed.
logger.error(
REGISTRY_FAILED_REFRESH_ADDRESS,
"metadata Server failure",
"",
"Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");
submitRetryTask(event);
return;
}
} else {
hasEmptyMetadata = false;
}
Map<String, Map<Integer, Map<Set<String>, Object>>> protocolRevisionsToUrls = new HashMap<>();
Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new HashMap<>();
for (Map.Entry<ServiceInfo, Set<String>> entry : localServiceToRevisions.entrySet()) {
ServiceInfo serviceInfo = entry.getKey();
Set<String> revisions = entry.getValue();
Map<Integer, Map<Set<String>, Object>> portToRevisions =
protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new HashMap<>());
Map<Set<String>, Object> revisionsToUrls =
portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
Object urls = revisionsToUrls.computeIfAbsent(
revisions,
k -> getServiceUrlsCache(
revisionToInstances, revisions, serviceInfo.getProtocol(), serviceInfo.getPort()));
List<ProtocolServiceKeyWithUrls> list =
newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
list.add(new ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>) urls));
}
this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
if (hasEmptyMetadata) {
submitRetryTask(event);
}
}