in ingestor/service.go [242:307]
func (s *Service) Open(ctx context.Context) error {
var svcCtx context.Context
svcCtx, s.closeFn = context.WithCancel(ctx)
if err := s.store.Open(svcCtx); err != nil {
return err
}
if err := s.coordinator.Open(svcCtx); err != nil {
return err
}
if err := s.batcher.Open(svcCtx); err != nil {
return err
}
if err := s.replicator.Open(svcCtx); err != nil {
return err
}
if err := s.metrics.Open(svcCtx); err != nil {
return err
}
if err := s.scheduler.Open(svcCtx); err != nil {
return err
}
s.scheduler.ScheduleEvery(time.Minute, "ingestor-health-check", func(ctx context.Context) error {
metrics.IngestorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
return nil
})
fnStore := ingestorstorage.NewFunctions(s.opts.K8sCtrlCli, s.coordinator)
crdStore := ingestorstorage.NewCRDHandler(s.opts.K8sCtrlCli, s.coordinator)
for _, v := range s.opts.MetricsKustoCli {
t := adx.NewDropUnusedTablesTask(v)
s.scheduler.ScheduleEvery(12*time.Hour, "delete-unused-tables", func(ctx context.Context) error {
return t.Run(ctx)
})
f := adx.NewSyncFunctionsTask(fnStore, v)
s.scheduler.ScheduleEvery(time.Minute, "sync-metrics-functions", func(ctx context.Context) error {
return f.Run(ctx)
})
m := adx.NewManagementCommandsTask(crdStore, v)
s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
return m.Run(ctx)
})
}
for _, v := range s.opts.LogsKustoCli {
f := adx.NewSyncFunctionsTask(fnStore, v)
s.scheduler.ScheduleEvery(time.Minute, "sync-logs-functions", func(ctx context.Context) error {
return f.Run(ctx)
})
m := adx.NewManagementCommandsTask(crdStore, v)
s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
return m.Run(ctx)
})
}
return nil
}