func()

in remoting/xds/client.go [394:513]


func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP(localDebugMode bool) error {
	// call watch and refresh istiod debug interface
	foundLocalStopCh := make(chan struct{})
	foundIstiodStopCh := make(chan struct{})
	discoveryFinishedStopCh := make(chan struct{})
	// todo timeout configure
	timeoutCh := time.After(w.xdsSniffingTimeout)
	foundLocal := false
	foundIstiod := false
	var cancel1 func()
	var cancel2 func()
	logger.Infof("[XDS Wrapped Client] Start sniffing with istio hostname = %s, localIp = %s",
		w.istiodAddr.HostnameOrIP, w.localIP)

	// todo @(laurence) here, if istiod is unhealthy, here should be timeout and tell developer.
	_ = w.xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
		if update.ClusterName == "" {
			return
		}
		if update.ClusterName[:1] == constant.MeshDeleteClusterPrefix {
			// delete event
			w.cdsMapLock.Lock()
			defer w.cdsMapLock.Unlock()
			delete(w.cdsMap, update.ClusterName[1:])
			logger.Infof("[XDS Wrapped Client] Delete cluster %s", update.ClusterName)
			w.cdsUpdateEventChan <- struct{}{} // send update event
			return
		}
		w.cdsMapLock.Lock()
		w.cdsMap[update.ClusterName] = update
		w.cdsMapLock.Unlock()

		w.cdsUpdateEventChan <- struct{}{} // send update event
		if foundLocal && foundIstiod {
			return
		}
		logger.Infof("[XDS Wrapped Client] Sniffing with cluster name = %s", update.ClusterName)
		// only into here during start sniffing istiod/service prcedure
		cluster := xdsCommon.NewCluster(update.ClusterName)
		if cluster.Addr.HostnameOrIP == w.istiodAddr.HostnameOrIP {
			// 1. find istiod podIP
			// todo: When would eds level watch be canceled?
			logger.Info("[XDS Wrapped Client] Sniffing get istiod cluster")
			cancel1 = w.xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
				if foundIstiod {
					return
				}
				logger.Infof("[XDS Wrapped Client] Sniffing get istiod endpoint = %+v, localities = %+v", endpoint, endpoint.Localities)
				for _, v := range endpoint.Localities {
					for _, e := range v.Endpoints {
						w.istiodPodIP = xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP
						logger.Infof("[XDS Wrapped Client] Sniffing found istiod podIP = %s", w.istiodPodIP)
						foundIstiod = true
						close(foundIstiodStopCh)
					}
				}
			})
			return
		}
		// 2. found local hostAddr
		// todo: When would eds level watch be canceled?
		cancel2 = w.xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
			if foundLocal {
				return
			}
			for _, v := range endpoint.Localities {
				for _, e := range v.Endpoints {
					logger.Infof("[XDS Wrapped Client] Sniffing Found eds endpoint = %+v", e)
					if xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP == w.localIP {
						cluster := xdsCommon.NewCluster(update.ClusterName)
						w.hostAddr = cluster.Addr
						foundLocal = true
						close(foundLocalStopCh)
					}
				}
			}
		})
	})

	if localDebugMode {
		go func() {
			<-foundIstiodStopCh
			<-foundLocalStopCh
			cancel1()
			cancel2()
		}()
		return nil
	}

	go func() {
		<-foundIstiodStopCh
		<-foundLocalStopCh
		close(discoveryFinishedStopCh)
	}()

	select {
	case <-discoveryFinishedStopCh:
		// discovery success
		// waiting for cancel function to have value
		time.Sleep(time.Second)
		cancel1()
		cancel2()
		logger.Infof("[XDS Wrapper Client] Sniffing Finished with host addr = %s, istiod pod ip = %s", w.hostAddr, w.istiodPodIP)
		return nil
	case <-timeoutCh:
		logger.Warnf("[XDS Wrapper Client] Sniffing timeout with duration = %v", w.xdsSniffingTimeout)
		if cancel1 != nil {
			cancel1()
		}
		if cancel2 != nil {
			cancel2()
		}
		select {
		case <-foundIstiodStopCh:
			return DiscoverLocalError
		default:
			return DiscoverIstiodPodIpError
		}
	}
}