in shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java [122:203]
private void watchConfigKeyValues(final String watchPathRoot,
final BiConsumer<String, String> updateHandler,
final Consumer<String> deleteHandler) {
try {
Long currentIndex = this.consulIndexes.get(watchPathRoot);
if (Objects.isNull(currentIndex)) {
currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
}
Response<List<GetValue>> response = this.consulClient.getKVValues(watchPathRoot, null,
new QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()), currentIndex));
if (Objects.isNull(response.getValue()) || response.getValue().isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("No value for watchPathRoot {}", watchPathRoot);
}
this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
return;
}
Long newIndex = response.getConsulIndex();
if (Objects.isNull(newIndex)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Same index for watchPathRoot {}", watchPathRoot);
}
this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
return;
}
if (Objects.equals(newIndex, currentIndex)) {
this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
-1, TimeUnit.MILLISECONDS);
return;
}
if (!this.consulIndexes.containsValue(newIndex)
&& !currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
if (LOG.isTraceEnabled()) {
LOG.trace("watchPathRoot {} has new index {}", watchPathRoot, newIndex);
}
final Long lastIndex = currentIndex;
final List<ConsulData> lastDatas = cacheConsulDataKeyMap.get(watchPathRoot);
response.getValue().forEach(data -> {
if (data.getModifyIndex() == lastIndex) {
//data has not changed
return;
}
if (Objects.nonNull(lastDatas)) {
final ConsulData consulData = lastDatas.stream()
.filter(lastData -> data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null);
if (Objects.nonNull(consulData) && !StringUtils.isBlank(consulData.getConsulDataMd5())
&& consulData.getConsulDataMd5().equals(DigestUtils.md5Hex(data.getValue()))) {
return;
}
}
updateHandler.accept(data.getKey(), data.getDecodedValue());
});
final List<String> currentKeys = response.getValue().stream().map(GetValue::getKey).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(lastDatas)) {
// handler delete event
lastDatas.stream()
.map(ConsulData::getConsulKey)
.filter(lastKey -> !currentKeys.contains(lastKey))
.forEach(deleteHandler);
}
// save last Keys
cacheConsulDataKeyMap.put(watchPathRoot, response.getValue().stream().map(data -> {
final ConsulData consulData = new ConsulData();
consulData.setConsulKey(data.getKey());
consulData.setConsulDataMd5(DigestUtils.md5Hex(data.getValue()));
return consulData;
}).collect(Collectors.toList()));
} else if (LOG.isTraceEnabled()) {
LOG.info("Event for index already published for watchPathRoot {}", watchPathRoot);
}
this.consulIndexes.put(watchPathRoot, newIndex);
this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
-1, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Error querying consul Key/Values for watchPathRoot '{}'. Message: ", watchPathRoot, e);
this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
}
}