in pkg/discovery/etcd3.go [77:177]
func (s *EtcdRegistryService) watch(key string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}
if resp != nil {
for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}
}
// watch the changes of endpoints
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())
for {
select {
case watchResp, ok := <-watchCh:
if !ok {
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)
k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()
case etcd3.EventTypeDelete:
log.Infof("Key %s deleted.\n", event.Kv.Key)
cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
}
}