in dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java [126:182]
public Map<String, T> getResourceFromRemote(Set<String> resourceNames) {
try {
resourceLock.lock();
CompletableFuture<Map<String, T>> future = new CompletableFuture<>();
observeResourcesName = resourceNames;
Set<String> consumerObserveResourceNames = new HashSet<>();
if (resourceNames.isEmpty()) {
consumerObserveResourceNames.add(emptyResourceName);
} else {
consumerObserveResourceNames = resourceNames;
}
Consumer<Map<String, T>> futureConsumer = future::complete;
try {
writeLock.lock();
ConcurrentHashMapUtils.computeIfAbsent(
(ConcurrentHashMap<Set<String>, List<Consumer<Map<String, T>>>>) consumerObserveMap,
consumerObserveResourceNames,
key -> new ArrayList<>())
.add(futureConsumer);
} finally {
writeLock.unlock();
}
Set<String> resourceNamesToObserve = new HashSet<>(resourceNames);
resourceNamesToObserve.addAll(resourcesMap.keySet());
adsObserver.request(buildDiscoveryRequest(resourceNamesToObserve));
logger.info("Send xDS Observe request to remote. Resource count: " + resourceNamesToObserve.size()
+ ". Resource Type: " + getTypeUrl());
try {
Map<String, T> result = future.get();
try {
writeLock.lock();
consumerObserveMap.get(consumerObserveResourceNames).removeIf(o -> o.equals(futureConsumer));
} finally {
writeLock.unlock();
}
return result;
} catch (InterruptedException e) {
logger.error(
INTERNAL_INTERRUPTED,
"",
"",
"InterruptedException occur when request control panel. error=",
e);
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error(PROTOCOL_FAILED_REQUEST, "", "", "Error occur when request control panel. error=", e);
}
} finally {
resourceLock.unlock();
}
return Collections.emptyMap();
}