func()

in pkg/cloudmap/instances_health_prober.go [109:165]


func (p *defaultInstancesHealthProber) probeLoop(ctx context.Context) {
	probeConfigByServiceSubset := make(map[serviceSubsetID]probeConfig)
	for {
		var timer <-chan time.Time
		if len(probeConfigByServiceSubset) > 0 {
			timer = time.After(p.probePeriod)
		}

		select {
		case <-ctx.Done():
			return
		case probeRequest := <-p.probeRequestChan:
			if len(probeRequest.instanceInfoByID) == 0 {
				delete(probeConfigByServiceSubset, probeRequest.serviceSubsetID)
			} else {
				probeEntries := make([]instanceProbeEntry, 0, len(probeRequest.instanceInfoByID))
				for instanceID, instanceInfo := range probeRequest.instanceInfoByID {
					probeEntries = append(probeEntries, instanceProbeEntry{
						instanceID:         instanceID,
						instanceInfo:       instanceInfo,
						lastTransitionTime: time.Time{},
						lastHealthyStatus:  false,
					})
				}
				probeConfigByServiceSubset[probeRequest.serviceSubsetID] = probeConfig{
					probeFunc:    probeRequest.probeFunc,
					probeEntries: probeEntries,
					timeoutTime:  time.Now().Add(probeRequest.timeout),
				}
			}
		case <-timer:
			for serviceSubsetID, instancesProbeConfig := range probeConfigByServiceSubset {
				if time.Now().After(instancesProbeConfig.timeoutTime) {
					p.log.Error(errors.New("timeout probe instances"),
						"serviceID", serviceSubsetID.serviceID,
						"subsetID", serviceSubsetID.subsetID)
					delete(probeConfigByServiceSubset, serviceSubsetID)
					continue
				}
				probeEntriesToContinue, err := p.probeInstances(ctx, serviceSubsetID.serviceID, instancesProbeConfig.probeFunc, instancesProbeConfig.probeEntries)
				if err != nil {
					p.log.Error(err, "failed to probe instances",
						"serviceID", serviceSubsetID.serviceID,
						"subsetID", serviceSubsetID.subsetID)
				} else if len(probeEntriesToContinue) == 0 {
					delete(probeConfigByServiceSubset, serviceSubsetID)
				} else {
					probeConfigByServiceSubset[serviceSubsetID] = probeConfig{
						probeFunc:    instancesProbeConfig.probeFunc,
						probeEntries: probeEntriesToContinue,
						timeoutTime:  instancesProbeConfig.timeoutTime,
					}
				}
			}
		}
	}
}