pkg/exporter/exporter.go (175 lines of code) (raw):

/* MIT License Copyright (c) Microsoft Corporation. */ package exporter import ( "fmt" "time" "github.com/Azure/kubernetes-carbon-intensity-exporter/pkg/sdk/client" "github.com/antihax/optional" "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ) var ( constantBackoff = wait.Backoff{ Duration: 3 * time.Second, Steps: 10, } ) type Exporter struct { clusterClient clientset.Interface apiClient *client.APIClient recorder record.EventRecorder } func New(clusterClient clientset.Interface, apiClient *client.APIClient, recorder record.EventRecorder) (*Exporter, error) { b := &Exporter{ clusterClient: clusterClient, apiClient: apiClient, recorder: recorder, } return b, nil } func (e *Exporter) Run(ctx context.Context, configMapName, region string, patrolInterval time.Duration, stopChan <-chan struct{}) { // create configMap first time err := e.RefreshData(ctx, configMapName, region) if err != nil { return } informerFactory := informers.NewSharedInformerFactory(e.clusterClient, time.Hour*1) configMapInformer := informerFactory.Core().V1().ConfigMaps().Informer() // Create a channel to receive events from the informer eventChan := make(chan interface{}) defer close(eventChan) configMapInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { cm, ok := obj.(*corev1.ConfigMap) if !ok { return } if cm.ObjectMeta.Name != configMapName { return } eventChan <- obj }, }) go configMapInformer.Run(stopChan) // Wait for the informer to sync if !cache.WaitForCacheSync(stopChan, configMapInformer.HasSynced) { runtime.HandleError(fmt.Errorf("timed out waiting for cache to sync")) return } refreshPatrol := time.NewTicker(patrolInterval) defer refreshPatrol.Stop() for { select { case <-eventChan: err := e.RefreshData(ctx, configMapName, region) if err != nil { return } e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeWarning, "Configmap Deleted", "Configmap got deleted") // if refresh time elapsed case <-refreshPatrol.C: err := e.DeleteConfigMap(ctx, configMapName) if err != nil && !apierrors.IsNotFound(err) { break } e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeNormal, "Configmap updated", "Configmap got updated") // context got canceled or done case <-ctx.Done(): case <-stopChan: return } } } func (e *Exporter) RefreshData(ctx context.Context, configMapName string, region string) error { // get current object (if any) in case we could not update the data. currentConfigMap, err := e.GetConfigMap(ctx, configMapName) if err != nil { return err } //delete old configmap if any err = e.DeleteConfigMap(ctx, configMapName) if err != nil && !apierrors.IsNotFound(err) { return err } var forecast []client.EmissionsForecastDto err = retry.OnError(constantBackoff, func(err error) bool { return true }, func() error { forecast, err = e.getCurrentForecastData(ctx, region) return err }) if err != nil { if currentConfigMap != nil { // return old data with failed message return e.UseCurrentConfigMap(ctx, err.Error(), currentConfigMap) } else { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeWarning, "Cannot retrieve updated forecast data", "Error while retrieving updated forecast data") klog.Errorf("an error has occurred while retrieving updated forecast data") return err } } err = retry.OnError(constantBackoff, func(err error) bool { return true }, func() error { return e.CreateConfigMapFromEmissionForecast(ctx, configMapName, forecast) }) if err != nil { e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeWarning, "Configmap Create", "Error while creating configMap") klog.Errorf("an error has occurred while creating %s configMap, err: %s", configMapName, err.Error()) return err } e.recorder.Eventf(&corev1.ObjectReference{ Kind: "Pod", Namespace: client.Namespace, Name: client.PodName, }, corev1.EventTypeNormal, "Exporter results", "Done retrieve data") return nil } func (e *Exporter) UseCurrentConfigMap(ctx context.Context, message string, currentConfigMap *corev1.ConfigMap) error { if currentConfigMap.Data != nil { currentConfigMap.Data[ConfigMapLastHeartbeatTime] = time.Now().String() currentConfigMap.Data[ConfigMapMessage] = message } else { currentConfigMap.Data = map[string]string{ ConfigMapLastHeartbeatTime: time.Now().String(), ConfigMapMessage: message, } } if currentConfigMap.BinaryData == nil { currentConfigMap.BinaryData = map[string][]byte{ BinaryData: {}, } } return e.CreateConfigMapFromProperties(ctx, currentConfigMap.Name, currentConfigMap.Data, currentConfigMap.BinaryData[BinaryData]) } func (e *Exporter) getCurrentForecastData(ctx context.Context, region string) ([]client.EmissionsForecastDto, error) { opt := &client.CarbonAwareApiGetCurrentForecastDataOpts{ DataStartAt: optional.EmptyTime(), DataEndAt: optional.EmptyTime(), } forecast, _, err := e.apiClient.CarbonAwareApi.GetCurrentForecastData(ctx, []string{region}, opt) if err != nil { klog.ErrorS(err, "error while getting current forecast data") return nil, err } return forecast, nil }