in client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java [741:829]
private void initRpcClientHandler(final RpcClient rpcClientInner) {
/*
* Register Config Change /Config ReSync Handler
*/
rpcClientInner.registerServerRequestHandler((request, connection) -> {
//config change notify
if (request instanceof ConfigChangeNotifyRequest) {
return handleConfigChangeNotifyRequest((ConfigChangeNotifyRequest) request,
rpcClientInner.getName());
}
return null;
});
rpcClientInner.registerServerRequestHandler((request, connection) -> {
if (request instanceof ClientConfigMetricRequest) {
return handleClientMetricsRequest((ClientConfigMetricRequest) request);
}
return null;
});
rpcClientInner.registerServerRequestHandler(
new ClientFuzzyWatchNotifyRequestHandler(configFuzzyWatchGroupKeyHolder));
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected(Connection connection) {
LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
notifyListenConfig();
LOGGER.info("[{}] Connected,notify fuzzy listen context...", rpcClientInner.getName());
configFuzzyWatchGroupKeyHolder.notifyFuzzyWatchSync();
}
@Override
public void onDisConnect(Connection connection) {
String taskId = rpcClientInner.getLabels().get("taskId");
LOGGER.info("[{}] DisConnected,reset listen context", rpcClientInner.getName());
Collection<CacheData> values = cacheMap.get().values();
for (CacheData cacheData : values) {
if (StringUtils.isNotBlank(taskId)) {
if (Integer.valueOf(taskId).equals(cacheData.getTaskId())) {
cacheData.setConsistentWithServer(false);
}
} else {
cacheData.setConsistentWithServer(false);
}
}
LOGGER.info("[{}] DisConnected,reset fuzzy watch consistence status", rpcClientInner.getName());
configFuzzyWatchGroupKeyHolder.resetConsistenceStatus();
}
});
rpcClientInner.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return ConfigRpcTransportClient.super.serverListManager.genNextServer();
}
@Override
public String getCurrentServer() {
return ConfigRpcTransportClient.super.serverListManager.getCurrentServer();
}
@Override
public List<String> getServerList() {
return ConfigRpcTransportClient.super.serverListManager.getServerList();
}
});
subscriber = new Subscriber() {
@Override
public void onEvent(Event event) {
rpcClientInner.onServerListChange();
}
@Override
public Class<? extends Event> subscribeType() {
return ServerListChangeEvent.class;
}
};
NotifyCenter.registerSubscriber(subscriber);
}