func()

in pkg/discovery/etcd3.go [77:177]


func (s *EtcdRegistryService) watch(key string) {

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
	if err != nil {
		log.Infof("cant get server instances from etcd")
	}

	if resp != nil {
		for _, kv := range resp.Kvs {
			k := kv.Key
			v := kv.Value
			clusterName, err := getClusterName(k)
			if err != nil {
				log.Errorf("etcd key has an incorrect format: ", err)
				return
			}
			serverInstance, err := getServerInstance(v)
			if err != nil {
				log.Errorf("etcd value has an incorrect format: ", err)
				return
			}
			s.rwLock.Lock()
			if s.grouplist[clusterName] == nil {
				s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
			} else {
				s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
			}
			s.rwLock.Unlock()
		}

	}
	// watch the changes of endpoints
	watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())

	for {
		select {
		case watchResp, ok := <-watchCh:
			if !ok {
				log.Warnf("Watch channel closed")
				return
			}
			for _, event := range watchResp.Events {
				switch event.Type {
				case etcd3.EventTypePut:
					log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)

					k := event.Kv.Key
					v := event.Kv.Value
					clusterName, err := getClusterName(k)
					if err != nil {
						log.Errorf("etcd key err: ", err)
						return
					}
					serverInstance, err := getServerInstance(v)
					if err != nil {
						log.Errorf("etcd value err: ", err)
						return
					}

					s.rwLock.Lock()
					if s.grouplist[clusterName] == nil {
						s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
						s.rwLock.Unlock()
						continue
					}
					if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
						s.rwLock.Unlock()
						continue
					}
					s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
					s.rwLock.Unlock()

				case etcd3.EventTypeDelete:
					log.Infof("Key %s deleted.\n", event.Kv.Key)

					cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
					if err != nil {
						log.Errorf("etcd key err: ", err)
						return
					}

					s.rwLock.Lock()
					serviceInstances := s.grouplist[cluster]
					if serviceInstances == nil {
						log.Warnf("etcd doesnt exit cluster: ", cluster)
						s.rwLock.Unlock()
						continue
					}
					s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
					s.rwLock.Unlock()
				}
			}
		case <-s.stopCh:
			log.Warn("stop etcd watch")
			return
		}
	}
}