private void watchChildKeyChangeEvent()

in mode/cluster/repository/consul/src/main/java/org/apache/shardingsphere/mode/repository/cluster/consul/ConsulRepository.java [182:228]


    private void watchChildKeyChangeEvent(final String key, final DataChangedEventListener listener) {
        AtomicBoolean running = new AtomicBoolean(true);
        long currentIndex = 0;
        while (running.get()) {
            Response<List<GetValue>> response = consulClient.getKVValues(key, new QueryParams(consulProps.getValue(ConsulPropertyKey.BLOCK_QUERY_TIME_TO_SECONDS), currentIndex));
            List<GetValue> value = response.getValue();
            if (null == value) {
                continue;
            }
            Long index = response.getConsulIndex();
            if (null != index && 0 == currentIndex) {
                currentIndex = index;
                if (!watchKeyMap.containsKey(key)) {
                    watchKeyMap.put(key, new HashSet<>());
                }
                Collection<String> watchKeys = watchKeyMap.get(key);
                for (GetValue each : value) {
                    watchKeys.add(each.getKey());
                }
                continue;
            }
            if (null != index && index > currentIndex) {
                currentIndex = index;
                Collection<String> newKeys = new HashSet<>(value.size(), 1F);
                Collection<String> watchKeys = watchKeyMap.get(key);
                for (GetValue each : value) {
                    newKeys.add(each.getKey());
                    if (!watchKeys.contains(each.getKey())) {
                        watchKeys.add(each.getKey());
                        fireDataChangeEvent(each, listener, DataChangedEvent.Type.ADDED);
                    } else if (watchKeys.contains(each.getKey()) && each.getModifyIndex() >= currentIndex) {
                        fireDataChangeEvent(each, listener, DataChangedEvent.Type.UPDATED);
                    }
                }
                for (String each : watchKeys) {
                    if (!newKeys.contains(each)) {
                        GetValue getValue = new GetValue();
                        getValue.setKey(each);
                        fireDataChangeEvent(getValue, listener, DataChangedEvent.Type.DELETED);
                    }
                }
                watchKeyMap.put(key, newKeys);
            } else if (null != index && index < currentIndex) {
                currentIndex = 0;
            }
        }
    }