in client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java [1060:1148]
private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) throws NacosException {
final AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
if (!listenCachesMap.isEmpty()) {
List<Future> listenFutures = new ArrayList<>();
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
RpcClient rpcClient = ensureRpcClient(taskId);
ExecutorService executorService = ensureSyncExecutor(taskId);
Future future = executorService.submit(() -> {
List<CacheData> listenCaches = entry.getValue();
//reset notify change flag.
for (CacheData cacheData : listenCaches) {
cacheData.getReceiveNotifyChanged().set(false);
}
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (listenResponse != null && listenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
List<ConfigChangeBatchListenResponse.ConfigContext> changedConfigs = listenResponse.getChangedConfigs();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(changedConfigs)) {
hasChangedKeys.set(true);
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),
changeConfig.getGroup(), changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
}
}
for (CacheData cacheData : listenCaches) {
if (cacheData.getReceiveNotifyChanged().get()) {
String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
cacheData.getTenant());
if (!changeKeys.contains(changeKey)) {
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
}
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
cacheData.setInitializing(false);
String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,
cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
synchronized (cacheData) {
if (!cacheData.getReceiveNotifyChanged().get()) {
cacheData.setConsistentWithServer(true);
}
}
}
}
}
} catch (Throwable e) {
LOGGER.error("Execute listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
notifyListenConfig();
}
});
listenFutures.add(future);
}
for (Future future : listenFutures) {
try {
future.get();
} catch (Throwable throwable) {
LOGGER.error("Async listen config change error ", throwable);
}
}
}
return hasChangedKeys.get();
}