in pkg/cloudmap/instances_health_prober.go [109:165]
func (p *defaultInstancesHealthProber) probeLoop(ctx context.Context) {
probeConfigByServiceSubset := make(map[serviceSubsetID]probeConfig)
for {
var timer <-chan time.Time
if len(probeConfigByServiceSubset) > 0 {
timer = time.After(p.probePeriod)
}
select {
case <-ctx.Done():
return
case probeRequest := <-p.probeRequestChan:
if len(probeRequest.instanceInfoByID) == 0 {
delete(probeConfigByServiceSubset, probeRequest.serviceSubsetID)
} else {
probeEntries := make([]instanceProbeEntry, 0, len(probeRequest.instanceInfoByID))
for instanceID, instanceInfo := range probeRequest.instanceInfoByID {
probeEntries = append(probeEntries, instanceProbeEntry{
instanceID: instanceID,
instanceInfo: instanceInfo,
lastTransitionTime: time.Time{},
lastHealthyStatus: false,
})
}
probeConfigByServiceSubset[probeRequest.serviceSubsetID] = probeConfig{
probeFunc: probeRequest.probeFunc,
probeEntries: probeEntries,
timeoutTime: time.Now().Add(probeRequest.timeout),
}
}
case <-timer:
for serviceSubsetID, instancesProbeConfig := range probeConfigByServiceSubset {
if time.Now().After(instancesProbeConfig.timeoutTime) {
p.log.Error(errors.New("timeout probe instances"),
"serviceID", serviceSubsetID.serviceID,
"subsetID", serviceSubsetID.subsetID)
delete(probeConfigByServiceSubset, serviceSubsetID)
continue
}
probeEntriesToContinue, err := p.probeInstances(ctx, serviceSubsetID.serviceID, instancesProbeConfig.probeFunc, instancesProbeConfig.probeEntries)
if err != nil {
p.log.Error(err, "failed to probe instances",
"serviceID", serviceSubsetID.serviceID,
"subsetID", serviceSubsetID.subsetID)
} else if len(probeEntriesToContinue) == 0 {
delete(probeConfigByServiceSubset, serviceSubsetID)
} else {
probeConfigByServiceSubset[serviceSubsetID] = probeConfig{
probeFunc: instancesProbeConfig.probeFunc,
probeEntries: probeEntriesToContinue,
timeoutTime: instancesProbeConfig.timeoutTime,
}
}
}
}
}
}