in remoting/zookeeper/listener.go [132:184]
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
l.pathMapLock.Lock()
a, ok := l.pathMap[zkPath]
if !ok || a.Load() > 1 {
l.pathMapLock.Unlock()
return false
}
a.Inc()
l.pathMapLock.Unlock()
defer a.Dec()
var zkEvent zk.Event
for {
keyEventCh, err := l.Client.ExistW(zkPath)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
return false
}
select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
content, _, err := l.Client.Conn.Get(zkEvent.Path)
if err != nil {
logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
return false
}
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
}
case zk.EventNodeCreated:
logger.Warnf("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeCreated event for path {%s}", zkPath)
if len(listener) > 0 {
content, _, err := l.Client.Conn.Get(zkEvent.Path)
if err != nil {
logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
return false
}
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
}
case zk.EventNotWatching:
logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNotWatching event for path {%s}", zkPath)
case zk.EventNodeDeleted:
logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeDeleted event for path {%s}", zkPath)
return true
}
case <-l.exit:
return false
}
}
}