func()

in ingestor/service.go [242:307]


func (s *Service) Open(ctx context.Context) error {
	var svcCtx context.Context
	svcCtx, s.closeFn = context.WithCancel(ctx)

	if err := s.store.Open(svcCtx); err != nil {
		return err
	}

	if err := s.coordinator.Open(svcCtx); err != nil {
		return err
	}

	if err := s.batcher.Open(svcCtx); err != nil {
		return err
	}

	if err := s.replicator.Open(svcCtx); err != nil {
		return err
	}

	if err := s.metrics.Open(svcCtx); err != nil {
		return err
	}

	if err := s.scheduler.Open(svcCtx); err != nil {
		return err
	}

	s.scheduler.ScheduleEvery(time.Minute, "ingestor-health-check", func(ctx context.Context) error {
		metrics.IngestorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
		return nil
	})

	fnStore := ingestorstorage.NewFunctions(s.opts.K8sCtrlCli, s.coordinator)
	crdStore := ingestorstorage.NewCRDHandler(s.opts.K8sCtrlCli, s.coordinator)
	for _, v := range s.opts.MetricsKustoCli {
		t := adx.NewDropUnusedTablesTask(v)
		s.scheduler.ScheduleEvery(12*time.Hour, "delete-unused-tables", func(ctx context.Context) error {
			return t.Run(ctx)
		})

		f := adx.NewSyncFunctionsTask(fnStore, v)
		s.scheduler.ScheduleEvery(time.Minute, "sync-metrics-functions", func(ctx context.Context) error {
			return f.Run(ctx)
		})

		m := adx.NewManagementCommandsTask(crdStore, v)
		s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
			return m.Run(ctx)
		})
	}

	for _, v := range s.opts.LogsKustoCli {
		f := adx.NewSyncFunctionsTask(fnStore, v)
		s.scheduler.ScheduleEvery(time.Minute, "sync-logs-functions", func(ctx context.Context) error {
			return f.Run(ctx)
		})

		m := adx.NewManagementCommandsTask(crdStore, v)
		s.scheduler.ScheduleEvery(10*time.Minute, "management-commands", func(ctx context.Context) error {
			return m.Run(ctx)
		})
	}

	return nil
}