pkg/scheduler/job.go (108 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE.txt file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package scheduler import ( "sync" "time" "github.com/go-logr/logr" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log" ) type Job interface { start() GetClient() client.Interface WithMetricListeners(listeners ...MetricListener) Job WithErrorListeners(listeners ...ErrorListener) Job } var _ Job = &metricJob{} func newMetricJob(c client.Interface, wg *sync.WaitGroup) Job { return &metricJob{ logger: log.ForPackage("job"), c: c, wg: wg, } } type metricJob struct { logger logr.Logger c client.Interface wg *sync.WaitGroup syncDone sync.Once listeners []MetricListener errorListeners []ErrorListener } func (m *metricJob) start() { go func() { // Attempt to get a first set of metrics m.refreshMetrics() dateTicker := time.NewTicker(1 * time.Minute) for range dateTicker.C { m.refreshMetrics() } }() } func (m *metricJob) refreshMetrics() { if m.GetClient().GetConfiguration().MetricTypes.HasType(config.CustomMetricType) { customMetrics, err := m.c.ListCustomMetricInfos() if err != nil { m.logger.Error(err, "Failed to update custom metric list", "client_name", m.GetClient().GetConfiguration().Name, "client_host", m.GetClient().GetConfiguration().ClientConfig.Host, ) m.publishError(config.CustomMetricType, err) return } m.logger.V(1).Info( "Refreshed custom metrics", "count", len(customMetrics), "client_name", m.GetClient().GetConfiguration().Name, "client_host", m.GetClient().GetConfiguration().ClientConfig.Host, ) for _, listener := range m.listeners { listener.UpdateCustomMetrics(m.c, customMetrics) } } if m.GetClient().GetConfiguration().MetricTypes.HasType(config.ExternalMetricType) { externalMetrics, err := m.c.ListExternalMetrics() if err != nil { m.logger.Error(err, "Failed to update external metric list", "client_name", m.GetClient().GetConfiguration().Name, "client_host", m.GetClient().GetConfiguration().ClientConfig.Host, ) m.publishError(config.ExternalMetricType, err) return } m.logger.V(1).Info( "Refreshed external metrics", "metrics_count", len(externalMetrics), "client_name", m.GetClient().GetConfiguration().Name, "client_host", m.GetClient().GetConfiguration().ClientConfig.Host, ) for _, listener := range m.listeners { listener.UpdateExternalMetrics(m.c, externalMetrics) } } m.syncDone.Do(func() { m.logger.V(1).Info( "First sync successful", "client_name", m.GetClient().GetConfiguration().Name, "client_host", m.GetClient().GetConfiguration().ClientConfig.Host, ) m.wg.Done() }) } func (m *metricJob) publishError(metricType config.MetricType, err error) { for _, listener := range m.errorListeners { listener.OnError(m.c, metricType, err) } } func (m *metricJob) GetClient() client.Interface { return m.c } func (m *metricJob) WithMetricListeners(listeners ...MetricListener) Job { m.listeners = append(m.listeners, listeners...) return m } func (m *metricJob) WithErrorListeners(listeners ...ErrorListener) Job { m.errorListeners = append(m.errorListeners, listeners...) return m }