in mode/cluster/repository/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java [182:228]
private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
AtomicBoolean running = new AtomicBoolean(true);
long currentIndex = 0;
while (running.get()) {
Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
List<GetValue> value = response.getValue();
if (null == value) {
continue;
}
Long index = response.getConsulIndex();
if (null != index && 0 == currentIndex) {
currentIndex = index;
if (!watchKeyMap.containsKey(key)) {
watchKeyMap.put(key, new HashSet<>());
}
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
watchKeys.add(each.getKey());
}
continue;
}
if (null != index && index > currentIndex) {
currentIndex = index;
Collection<String> newKeys = new HashSet<>(value.size(), 1F);
Collection<String> watchKeys = watchKeyMap.get(key);
for (GetValue each : value) {
newKeys.add(each.getKey());
if (!watchKeys.contains(each.getKey())) {
watchKeys.add(each.getKey());
fireDataChangeEvent(each, listener, DataChangedEvent.Type.ADDED);
} else if (watchKeys.contains(each.getKey()) && each.getModifyIndex() >= currentIndex) {
fireDataChangeEvent(each, listener, DataChangedEvent.Type.UPDATED);
}
}
for (String each : watchKeys) {
if (!newKeys.contains(each)) {
GetValue getValue = new GetValue();
getValue.setKey(each);
fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
}
}
watchKeyMap.put(key, newKeys);
} else if (null != index && index < currentIndex) {
currentIndex = 0;
}
}
}