private void watchConfigKeyValues()

in shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java [122:203]


    private void watchConfigKeyValues(final String watchPathRoot,
                                      final BiConsumer<String, String> updateHandler,
                                      final Consumer<String> deleteHandler) {
        try {
            Long currentIndex = this.consulIndexes.get(watchPathRoot);
            if (Objects.isNull(currentIndex)) {
                currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
            }
            Response<List<GetValue>> response = this.consulClient.getKVValues(watchPathRoot, null,
                    new QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()), currentIndex));
            if (Objects.isNull(response.getValue()) || response.getValue().isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("No value for watchPathRoot {}", watchPathRoot);
                }
                this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
                return;
            }
            Long newIndex = response.getConsulIndex();
            if (Objects.isNull(newIndex)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Same index for watchPathRoot {}", watchPathRoot);
                }
                this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
                return;
            }
            if (Objects.equals(newIndex, currentIndex)) {
                this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
                        -1, TimeUnit.MILLISECONDS);
                return;
            }
            if (!this.consulIndexes.containsValue(newIndex)
                    && !currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("watchPathRoot {} has new index {}", watchPathRoot, newIndex);
                }
                final Long lastIndex = currentIndex;
                final List<ConsulData> lastDatas = cacheConsulDataKeyMap.get(watchPathRoot);
                response.getValue().forEach(data -> {
                    if (data.getModifyIndex() == lastIndex) {
                        //data has not changed
                        return;
                    }
                    if (Objects.nonNull(lastDatas)) {
                        final ConsulData consulData = lastDatas.stream()
                                .filter(lastData -> data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null);
                        if (Objects.nonNull(consulData) && !StringUtils.isBlank(consulData.getConsulDataMd5())
                                && consulData.getConsulDataMd5().equals(DigestUtils.md5Hex(data.getValue()))) {
                            return;
                        }
                    }
                    updateHandler.accept(data.getKey(), data.getDecodedValue());
                });
                final List<String> currentKeys = response.getValue().stream().map(GetValue::getKey).collect(Collectors.toList());
                if (!ObjectUtils.isEmpty(lastDatas)) {
                    // handler delete event
                    lastDatas.stream()
                            .map(ConsulData::getConsulKey)
                            .filter(lastKey -> !currentKeys.contains(lastKey))
                            .forEach(deleteHandler);
                }

                // save last Keys
                cacheConsulDataKeyMap.put(watchPathRoot, response.getValue().stream().map(data -> {
                    final ConsulData consulData = new ConsulData();
                    consulData.setConsulKey(data.getKey());
                    consulData.setConsulDataMd5(DigestUtils.md5Hex(data.getValue()));
                    return consulData;
                }).collect(Collectors.toList()));
            } else if (LOG.isTraceEnabled()) {
                LOG.info("Event for index already published for watchPathRoot {}", watchPathRoot);
            }
            this.consulIndexes.put(watchPathRoot, newIndex);
            this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
                    -1, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.warn("Error querying consul Key/Values for watchPathRoot '{}'. Message: ", watchPathRoot, e);
            this.executor.schedule(() -> watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
                    consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
        }
    }