in remoting/xds/client.go [394:513]
func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP(localDebugMode bool) error {
// call watch and refresh istiod debug interface
foundLocalStopCh := make(chan struct{})
foundIstiodStopCh := make(chan struct{})
discoveryFinishedStopCh := make(chan struct{})
// todo timeout configure
timeoutCh := time.After(w.xdsSniffingTimeout)
foundLocal := false
foundIstiod := false
var cancel1 func()
var cancel2 func()
logger.Infof("[XDS Wrapped Client] Start sniffing with istio hostname = %s, localIp = %s",
w.istiodAddr.HostnameOrIP, w.localIP)
// todo @(laurence) here, if istiod is unhealthy, here should be timeout and tell developer.
_ = w.xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
if update.ClusterName == "" {
return
}
if update.ClusterName[:1] == constant.MeshDeleteClusterPrefix {
// delete event
w.cdsMapLock.Lock()
defer w.cdsMapLock.Unlock()
delete(w.cdsMap, update.ClusterName[1:])
logger.Infof("[XDS Wrapped Client] Delete cluster %s", update.ClusterName)
w.cdsUpdateEventChan <- struct{}{} // send update event
return
}
w.cdsMapLock.Lock()
w.cdsMap[update.ClusterName] = update
w.cdsMapLock.Unlock()
w.cdsUpdateEventChan <- struct{}{} // send update event
if foundLocal && foundIstiod {
return
}
logger.Infof("[XDS Wrapped Client] Sniffing with cluster name = %s", update.ClusterName)
// only into here during start sniffing istiod/service prcedure
cluster := xdsCommon.NewCluster(update.ClusterName)
if cluster.Addr.HostnameOrIP == w.istiodAddr.HostnameOrIP {
// 1. find istiod podIP
// todo: When would eds level watch be canceled?
logger.Info("[XDS Wrapped Client] Sniffing get istiod cluster")
cancel1 = w.xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
if foundIstiod {
return
}
logger.Infof("[XDS Wrapped Client] Sniffing get istiod endpoint = %+v, localities = %+v", endpoint, endpoint.Localities)
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
w.istiodPodIP = xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP
logger.Infof("[XDS Wrapped Client] Sniffing found istiod podIP = %s", w.istiodPodIP)
foundIstiod = true
close(foundIstiodStopCh)
}
}
})
return
}
// 2. found local hostAddr
// todo: When would eds level watch be canceled?
cancel2 = w.xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
if foundLocal {
return
}
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
logger.Infof("[XDS Wrapped Client] Sniffing Found eds endpoint = %+v", e)
if xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP == w.localIP {
cluster := xdsCommon.NewCluster(update.ClusterName)
w.hostAddr = cluster.Addr
foundLocal = true
close(foundLocalStopCh)
}
}
}
})
})
if localDebugMode {
go func() {
<-foundIstiodStopCh
<-foundLocalStopCh
cancel1()
cancel2()
}()
return nil
}
go func() {
<-foundIstiodStopCh
<-foundLocalStopCh
close(discoveryFinishedStopCh)
}()
select {
case <-discoveryFinishedStopCh:
// discovery success
// waiting for cancel function to have value
time.Sleep(time.Second)
cancel1()
cancel2()
logger.Infof("[XDS Wrapper Client] Sniffing Finished with host addr = %s, istiod pod ip = %s", w.hostAddr, w.istiodPodIP)
return nil
case <-timeoutCh:
logger.Warnf("[XDS Wrapper Client] Sniffing timeout with duration = %v", w.xdsSniffingTimeout)
if cancel1 != nil {
cancel1()
}
if cancel2 != nil {
cancel2()
}
select {
case <-foundIstiodStopCh:
return DiscoverLocalError
default:
return DiscoverIstiodPodIpError
}
}
}