func()

in internal/servicecommunication/discovery/discovery.go [116:157]


func (d Service) CommonDiscovery(ctx context.Context, a any) {
	log.CtxLogger(ctx).Info("CommonDiscovery started")
	var chs []chan<- *servicecommunication.Message
	var ok bool
	if chs, ok = a.([]chan<- *servicecommunication.Message); !ok {
		log.CtxLogger(ctx).Warnw("args is not of type []chan servicecommunication.Message", "args", a, "type", reflect.TypeOf(a), "kind", reflect.TypeOf(a).Kind())
		return
	}
	frequency := 3 * time.Hour
	if d.Config.GetCommonDiscovery().GetCollectionFrequency() != nil {
		frequency = d.Config.GetCommonDiscovery().GetCollectionFrequency().AsDuration()
	}
	ticker := time.NewTicker(frequency)
	defer ticker.Stop()
	for {
		discoveryResult, err := d.commonDiscoveryLoop(ctx)
		if err != nil {
			log.CtxLogger(ctx).Errorw("Failed to perform common discovery", "error", err)
			return
		}
		log.CtxLogger(ctx).Infof("CommonDiscovery found %d processes.", len(discoveryResult.Processes))
		fullChs := 0
		for _, ch := range chs {
			select {
			case ch <- &servicecommunication.Message{Origin: servicecommunication.Discovery, DiscoveryResult: discoveryResult}:
			default:
				fullChs++
			}
		}
		if fullChs > 0 {
			log.CtxLogger(ctx).Infof("CommonDiscovery found %d full channels that it was unable to write to.", fullChs)
		}
		select {
		case <-ctx.Done():
			log.CtxLogger(ctx).Info("CommonDiscovery cancellation requested")
			return
		case <-ticker.C:
			log.CtxLogger(ctx).Debug("CommonDiscovery ticker fired")
			continue
		}
	}
}