in internal/servicecommunication/discovery/discovery.go [116:157]
func (d Service) CommonDiscovery(ctx context.Context, a any) {
log.CtxLogger(ctx).Info("CommonDiscovery started")
var chs []chan<- *servicecommunication.Message
var ok bool
if chs, ok = a.([]chan<- *servicecommunication.Message); !ok {
log.CtxLogger(ctx).Warnw("args is not of type []chan servicecommunication.Message", "args", a, "type", reflect.TypeOf(a), "kind", reflect.TypeOf(a).Kind())
return
}
frequency := 3 * time.Hour
if d.Config.GetCommonDiscovery().GetCollectionFrequency() != nil {
frequency = d.Config.GetCommonDiscovery().GetCollectionFrequency().AsDuration()
}
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
discoveryResult, err := d.commonDiscoveryLoop(ctx)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to perform common discovery", "error", err)
return
}
log.CtxLogger(ctx).Infof("CommonDiscovery found %d processes.", len(discoveryResult.Processes))
fullChs := 0
for _, ch := range chs {
select {
case ch <- &servicecommunication.Message{Origin: servicecommunication.Discovery, DiscoveryResult: discoveryResult}:
default:
fullChs++
}
}
if fullChs > 0 {
log.CtxLogger(ctx).Infof("CommonDiscovery found %d full channels that it was unable to write to.", fullChs)
}
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("CommonDiscovery cancellation requested")
return
case <-ticker.C:
log.CtxLogger(ctx).Debug("CommonDiscovery ticker fired")
continue
}
}
}