in dubbo-admin-server/src/main/java/org/apache/dubbo/admin/service/impl/InstanceRegistryCache.java [81:121]
public synchronized void refreshConsumer(boolean refreshAll) {
if (startRefresh.compareAndSet(false, true)) {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("Consumer-Refresh"));
executorService.scheduleAtFixedRate(() -> refreshConsumer(true), 60, 60, TimeUnit.MINUTES);
}
Map<String, Map<String, List<URL>>> origin;
if (refreshAll) {
origin = new ConcurrentHashMap<>();
} else {
origin = subscribedCache;
}
Map<String, List<InstanceAddressURL>> providers = get(Constants.PROVIDERS_CATEGORY).values().stream()
.flatMap((e) -> e.values().stream())
.flatMap(Collection::stream)
.collect(Collectors.groupingBy(InstanceAddressURL::getAddress));
// remove cached
origin.keySet().forEach(providers::remove);
for (List<InstanceAddressURL> instanceAddressURLs : providers.values()) {
ServiceInstance instance = instanceAddressURLs.get(0).getInstance();
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
if (!REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
MetadataService metadataService = MetadataUtils.referProxy(instance).getProxy();
try {
Set<String> subscribedURLs = metadataService.getSubscribedURLs();
Map<String, List<URL>> subscribed = subscribedURLs.stream().map(URL::valueOf).collect(Collectors.groupingBy(URL::getServiceKey));
origin.put(instanceAddressURLs.get(0).getAddress(), subscribed);
} catch (Throwable ignored) {
}
((Destroyable) metadataService).$destroy();
}
}
}