pkg/scheduler/scheduler.go (60 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE.txt file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package scheduler import ( "sync" "github.com/go-logr/logr" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log" ) type Scheduler struct { logger logr.Logger wg *sync.WaitGroup sources []Job } // Start starts all the metric sources. func (s *Scheduler) Start() *Scheduler { for _, source := range s.sources { source.start() } return s } func (s *Scheduler) WithMetricListeners(listeners ...MetricListener) *Scheduler { for i := range s.sources { for j := range listeners { s.sources[i].WithMetricListeners(listeners[j]) } } return s } func (s *Scheduler) WithErrorListeners(listeners ...ErrorListener) *Scheduler { for i := range s.sources { for j := range listeners { s.sources[i].WithErrorListeners(listeners[j]) } } return s } // NewScheduler creates a new scheduler with an initial set of clients. func NewScheduler(clients ...client.Interface) *Scheduler { scheduler := &Scheduler{ logger: log.ForPackage("scheduler"), wg: &sync.WaitGroup{}, sources: make([]Job, len(clients)), } for i := range clients { scheduler.sources[i] = newMetricJob(clients[i], scheduler.wg) } scheduler.wg.Add(len(clients)) return scheduler } // WithClients adds more metrics clients to the scheduler. func (s *Scheduler) WithClients(clients ...client.Interface) *Scheduler { for i := range clients { source := newMetricJob(clients[i], s.wg) s.sources = append(s.sources, source) } s.wg.Add(len(clients)) return s } // WaitInitialSync blocks until all the func (s *Scheduler) WaitInitialSync() *Scheduler { s.logger.Info("Wait until an initial metric list is grabbed from metric clients", "sources_count", len(s.sources)) s.wg.Wait() s.logger.Info("Initial metric list is grabbed from metric clients", "sources_count", len(s.sources)) return s }