func()

in remoting/zookeeper/listener.go [243:316]


func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
	var (
		failTimes int
		ttl       time.Duration
	)
	ttl = defaultTTL
	if conf != nil {
		if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
			ttl = timeout
		} else {
			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
		}
	}
	if ttl > 20e9 {
		ttl = 20e9
	}

	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
	for {
		// get all interfaces
		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
		if err != nil {
			failTimes++
			if MaxFailTimes <= failTimes {
				failTimes = MaxFailTimes
			}
			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
			after := time.After(timeSecondDuration(failTimes * ConnDelay))
			select {
			case <-after:
				continue
			case <-l.exit:
				return
			}
		}
		failTimes = 0
		if len(children) == 0 {
			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
		}
		for _, c := range children {
			// Build the child path
			zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
			// Save the path to avoid listen repeatedly
			l.pathMapLock.Lock()
			if _, ok := l.pathMap[zkRootPath]; ok {
				logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
				l.pathMapLock.Unlock()
				continue
			} else {
				l.pathMap[zkRootPath] = uatomic.NewInt32(0)
			}
			l.pathMapLock.Unlock()
			logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
			l.wg.Add(1)
			// listen every interface
			go l.listenDirEvent(conf, zkRootPath, listener, c)
		}

		ticker := time.NewTicker(ttl)
		select {
		case <-ticker.C:
			ticker.Stop()
		case zkEvent := <-childEventCh:
			logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
			ticker.Stop()
		case <-l.exit:
			logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
			ticker.Stop()
			return
		}
	}
}