public void doSubscribe()

in dubbo-registry-extensions/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java [135:243]


    public void doSubscribe(URL url, NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();

                /*
                 *  if we are interested in all interfaces,
                 *  find out the current container or create one for the url, put or get only once.
                 */
                ConcurrentMap<NotifyListener, ChildListener> listeners =
                        Optional.ofNullable(etcdListeners.get(url))
                                .orElseGet(() -> {
                                    ConcurrentMap<NotifyListener, ChildListener> container, prev;
                                    prev = etcdListeners.putIfAbsent(url, container = new ConcurrentHashMap<>());
                                    return prev != null ? prev : container;
                                });

                /*
                 *  if we have no interface watcher listener,
                 *  find the current listener or create one for the current root, put or get only once.
                 */
                ChildListener interfaceListener =
                        Optional.ofNullable(listeners.get(listener))
                                .orElseGet(() -> {
                                    ChildListener childListener, prev;
                                    prev = listeners.putIfAbsent(listener, childListener = (parentPath, currentChildren) -> {
                                        /*
                                         *  because etcd3 does not support direct children watch events,
                                         *  we should filter not interface events. if we watch /dubbo
                                         *  and /dubbo/interface, when we put a key-value pair {/dubbo/interface/hello hello},
                                         *  we will got events in watching path /dubbo.
                                         */
                                        for (String child : currentChildren) {
                                            child = URL.decode(child);
                                            if (!anyServices.contains(child)) {
                                                anyServices.add(child);
                                                /*
                                                 *  if new interface event arrived, we watch their direct children,
                                                 *  eg: /dubbo/interface, /dubbo/interface and so on.
                                                 */
                                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                                        CHECK_KEY, String.valueOf(false)), listener);
                                            }
                                        }
                                    });
                                    return prev != null ? prev : childListener;
                                });

                etcdClient.create(root);
                /*
                 * at the first time, we want to pull already interface and then watch their direct children,
                 *  eg: /dubbo/interface, /dubbo/interface and so on.
                 */
                List<String> services = etcdClient.addChildListener(root, interfaceListener);
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            CHECK_KEY, String.valueOf(false)), listener);
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {

                    /*
                     *  if we are interested in special categories (providers, consumers, routers and so on),
                     *  we find out the current container or create one for the url, put or get only once.
                     */
                    ConcurrentMap<NotifyListener, ChildListener> listeners =
                            Optional.ofNullable(etcdListeners.get(url))
                                    .orElseGet(() -> {
                                        ConcurrentMap<NotifyListener, ChildListener> container, prev;
                                        prev = etcdListeners.putIfAbsent(url,
                                                container = new ConcurrentHashMap<>());
                                        return prev != null ? prev : container;
                                    });

                    /*
                     *  if we have no category watcher listener,
                     *  we find out the current listener or create one for the current category, put or get only once.
                     */
                    ChildListener childListener =
                            Optional.ofNullable(listeners.get(listener))
                                    .orElseGet(() -> {
                                        ChildListener watchListener, prev;
                                        prev = listeners.putIfAbsent(listener, watchListener = (parentPath, currentChildren) -> EtcdRegistry.this.notify(url, listener,
                                                toUrlsWithEmpty(url, parentPath, currentChildren)));
                                        return prev != null ? prev : watchListener;
                                    });

                    etcdClient.create(path);
                    /*
                     * at the first time, we want to pull already category and then watch their direct children,
                     *  eg: /dubbo/interface/providers, /dubbo/interface/consumers and so on.
                     */
                    List<String> children = etcdClient.addChildListener(path, childListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to etcd " + getUrl()
                    + ", cause: " + (OptionUtil.isProtocolError(e)
                    ? "etcd3 registry may not be supported yet or etcd3 registry is not available."
                    : e.getMessage()), e);
        }
    }