in dubbo-xds/src/main/java/org/apache/dubbo/rpc/cluster/router/xds/RdsRouteRuleManager.java [67:112]
private void doSubscribeRds(String domain) {
synchronized (RdsRouteRuleManager.class) {
if (RDS_LISTENER == null) {
RDS_LISTENER = rds -> {
if (rds == null) {
return;
}
for (RouteResult routeResult : rds.values()) {
for (String domainToNotify : RDS_LISTENERS.keySet()) {
VirtualHost virtualHost = routeResult.searchVirtualHost(domainToNotify);
if (virtualHost != null) {
RDS_LISTENERS.get(domainToNotify).parseVirtualHost(virtualHost);
}
}
}
RDS_RESULT = rds;
};
}
if (LDS_LISTENER == null) {
LDS_LISTENER = new Consumer<Map<String, ListenerResult>>() {
private volatile Set<String> configNames = null;
@Override
public void accept(Map<String, ListenerResult> listenerResults) {
if (listenerResults.size() == 1) {
for (ListenerResult listenerResult : listenerResults.values()) {
Set<String> newConfigNames = listenerResult.getRouteConfigNames();
if (configNames == null) {
PilotExchanger.getInstance().observeRds(newConfigNames, RDS_LISTENER);
} else if (!configNames.equals(newConfigNames)) {
PilotExchanger.getInstance().unObserveRds(configNames, RDS_LISTENER);
PilotExchanger.getInstance().observeRds(newConfigNames, RDS_LISTENER);
}
configNames = newConfigNames;
}
}
}
};
if (PilotExchanger.isEnabled()) {
PilotExchanger.getInstance().observeLds(LDS_LISTENER);
}
}
}
ConcurrentHashMapUtils.computeIfAbsent(RDS_LISTENERS, domain, key -> new RdsVirtualHostListener(domain, this));
RDS_LISTENER.accept(RDS_RESULT);
}