in remoting/zookeeper/listener.go [244:317]
func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
var (
failTimes int
ttl time.Duration
)
ttl = defaultTTL
if conf != nil {
if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
ttl = timeout
} else {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
if ttl > 20e9 {
ttl = 20e9
}
rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
for {
// get all interfaces
children, childEventCh, err := l.Client.GetChildrenW(rootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
continue
case <-l.exit:
return
}
}
failTimes = 0
if len(children) == 0 {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
}
for _, c := range children {
// Build the child path
zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
if _, ok := l.pathMap[zkRootPath]; ok {
logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
l.pathMapLock.Unlock()
continue
} else {
l.pathMap[zkRootPath] = uatomic.NewInt32(0)
}
l.pathMapLock.Unlock()
logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
l.wg.Add(1)
// listen every interface
go l.listenDirEvent(conf, zkRootPath, listener, c)
}
ticker := time.NewTicker(ttl)
select {
case <-ticker.C:
ticker.Stop()
case zkEvent := <-childEventCh:
logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
case <-l.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
ticker.Stop()
return
}
}
}