pkg/monitor/worker.go (230 lines of code) (raw):

package monitor // Copyright (c) Microsoft Corporation. // Licensed under the Apache License 2.0. import ( "context" "fmt" "reflect" "sync" "time" "github.com/Azure/go-autorest/autorest/azure" "github.com/sirupsen/logrus" "k8s.io/client-go/rest" "github.com/Azure/ARO-RP/pkg/api" "github.com/Azure/ARO-RP/pkg/hive" "github.com/Azure/ARO-RP/pkg/monitor/azure/nsg" "github.com/Azure/ARO-RP/pkg/monitor/cluster" "github.com/Azure/ARO-RP/pkg/monitor/dimension" "github.com/Azure/ARO-RP/pkg/monitor/monitoring" utillog "github.com/Azure/ARO-RP/pkg/util/log" "github.com/Azure/ARO-RP/pkg/util/recover" "github.com/Azure/ARO-RP/pkg/util/restconfig" ) // nsgMonitoringFrequency is used for initializing NSG monitoring ticker var nsgMonitoringFrequency = 10 * time.Minute // This function will continue to run until such time as it has a config to add to the global Hive shard map // Note that because the mon.hiveShardConfigs[shard] is set to `nil` when its created, the cluster // monitors will simply ignore Hive stats until this function populates the config func (mon *monitor) populateHiveShardRestConfig(ctx context.Context, shard int) { var hiveRestConfig *rest.Config var err error for { hiveRestConfig, err = mon.liveConfig.HiveRestConfig(ctx, shard) if hiveRestConfig != nil { mon.setHiveShardConfig(shard, hiveRestConfig) return } mon.baseLog.Warn(fmt.Sprintf("error fetching Hive kubeconfig for shard %d", shard)) if err != nil { mon.baseLog.Error(err.Error()) } mon.baseLog.Info("pausing for a minute before retrying...") time.Sleep(60 * time.Second) } } // listBuckets reads our bucket allocation from the master func (mon *monitor) listBuckets(ctx context.Context) error { dbMonitors, err := mon.dbGroup.Monitors() if err != nil { return err } buckets, err := dbMonitors.ListBuckets(ctx) mon.mu.Lock() defer mon.mu.Unlock() oldBuckets := mon.buckets mon.buckets = make(map[int]struct{}, len(buckets)) for _, i := range buckets { mon.buckets[i] = struct{}{} } if !reflect.DeepEqual(mon.buckets, oldBuckets) { mon.baseLog.Printf("servicing %d buckets", len(mon.buckets)) mon.fixDocs() } return err } // changefeed tracks the OpenShiftClusters change feed and keeps mon.docs // up-to-date. We don't monitor clusters in Creating state, hence we don't add // them to mon.docs. We also don't monitor clusters in Deleting state; when // this state is reached we delete from mon.docs func (mon *monitor) changefeed(ctx context.Context, baseLog *logrus.Entry, stop <-chan struct{}) { defer recover.Panic(baseLog) dbOpenShiftClusters, err := mon.dbGroup.OpenShiftClusters() if err != nil { baseLog.Error(err) panic(err) } dbSubscriptions, err := mon.dbGroup.Subscriptions() if err != nil { baseLog.Error(err) panic(err) } clustersIterator := dbOpenShiftClusters.ChangeFeed() subscriptionsIterator := dbSubscriptions.ChangeFeed() // Align this time with the deletion mechanism. // Go to docs/monitoring.md for the details. t := time.NewTicker(10 * time.Second) defer t.Stop() for { successful := true for { docs, err := clustersIterator.Next(ctx, -1) if err != nil { successful = false baseLog.Error(err) break } if docs == nil { break } mon.mu.Lock() for _, doc := range docs.OpenShiftClusterDocuments { ps := doc.OpenShiftCluster.Properties.ProvisioningState fps := doc.OpenShiftCluster.Properties.FailedProvisioningState switch { case ps == api.ProvisioningStateCreating, ps == api.ProvisioningStateDeleting, ps == api.ProvisioningStateFailed && (fps == api.ProvisioningStateCreating || fps == api.ProvisioningStateDeleting): mon.deleteDoc(doc) default: // in the future we will have the shard index set on the api.OpenShiftClusterDocument // but for now we simply select Hive (AKS) shard 1 // e.g. shard := mon.hiveShardConfigs[doc.shardIndex] shard := 1 _, exists := mon.getHiveShardConfig(shard) if !exists { // set this to `nil` so cluster monitors will ignore it until its populated with config mon.setHiveShardConfig(shard, nil) go mon.populateHiveShardRestConfig(ctx, shard) } // TODO: improve memory usage by storing a subset of doc in mon.docs mon.upsertDoc(doc) } } mon.mu.Unlock() } for { subs, err := subscriptionsIterator.Next(ctx, -1) if err != nil { successful = false baseLog.Error(err) break } if subs == nil { break } mon.mu.Lock() for _, sub := range subs.SubscriptionDocuments { mon.subs[sub.ID] = sub } mon.mu.Unlock() } if successful { mon.lastChangefeed.Store(time.Now()) } select { case <-t.C: case <-stop: return } } } // worker reads clusters to be monitored and monitors them func (mon *monitor) worker(stop <-chan struct{}, delay time.Duration, id string) { defer recover.Panic(mon.baseLog) time.Sleep(delay) var r azure.Resource log := mon.baseLog { mon.mu.RLock() v := mon.docs[id] mon.mu.RUnlock() if v == nil { return } log = utillog.EnrichWithResourceID(log, v.doc.OpenShiftCluster.ID) var err error r, err = azure.ParseResourceID(v.doc.OpenShiftCluster.ID) if err != nil { log.Error(err) return } } log.Debug("starting monitoring") nsgMonitoringTicker := time.NewTicker(nsgMonitoringFrequency) defer nsgMonitoringTicker.Stop() t := time.NewTicker(time.Minute) defer t.Stop() h := time.Now().Hour() out: for { mon.mu.RLock() v := mon.docs[id] sub := mon.subs[r.SubscriptionID] mon.mu.RUnlock() if v == nil { break } newh := time.Now().Hour() // TODO: later can modify here to poll once per N minutes and re-issue // cached metrics in the remaining minutes if sub != nil && sub.Subscription != nil && sub.Subscription.State != api.SubscriptionStateSuspended && sub.Subscription.State != api.SubscriptionStateWarned { mon.workOne(context.Background(), log, v.doc, sub, newh != h, nsgMonitoringTicker) } select { case <-t.C: case <-stop: break out } h = newh } log.Debug("stopping monitoring") } // workOne checks the API server health of a cluster func (mon *monitor) workOne(ctx context.Context, log *logrus.Entry, doc *api.OpenShiftClusterDocument, sub *api.SubscriptionDocument, hourlyRun bool, nsgMonTicker *time.Ticker) { ctx, cancel := context.WithTimeout(ctx, 50*time.Second) defer cancel() restConfig, err := restconfig.RestConfig(mon.dialer, doc.OpenShiftCluster) if err != nil { log.Error(err) return } // once sharding is implemented, we will have the shard set on the api.OpenShiftClusterDocument // e.g. shard := mon.hiveShardConfigs[doc.shardIndex] shard := 1 hiveRestConfig, exists := mon.getHiveShardConfig(shard) if !exists { log.Warnf("no hiveShardConfigs set for shard %d", shard) } dims := map[string]string{ dimension.ClusterResourceID: doc.OpenShiftCluster.ID, dimension.Location: doc.OpenShiftCluster.Location, dimension.SubscriptionID: sub.ID, } var monitors []monitoring.Monitor var wg sync.WaitGroup hiveClusterManager, _ := hive.NewFromConfigClusterManager(log, mon.env, hiveRestConfig) nsgMon := nsg.NewMonitor(log, doc.OpenShiftCluster, mon.env, sub.ID, sub.Subscription.Properties.TenantID, mon.clusterm, dims, &wg, nsgMonTicker.C) c, err := cluster.NewMonitor(log, restConfig, doc.OpenShiftCluster, doc, mon.env, sub.Subscription.Properties.TenantID, mon.clusterm, hiveRestConfig, hourlyRun, &wg, hiveClusterManager) if err != nil { log.Error(err) mon.m.EmitGauge("monitor.cluster.failedworker", 1, map[string]string{ "resourceId": doc.OpenShiftCluster.ID, }) return } monitors = append(monitors, c, nsgMon) allJobsDone := make(chan bool) go execute(ctx, allJobsDone, &wg, monitors) select { case <-allJobsDone: case <-ctx.Done(): log.Infof("The monitoring process for cluster %s has timed out.", doc.OpenShiftCluster.ID) mon.m.EmitGauge("monitor.main.timedout", int64(1), dims) } } func execute(ctx context.Context, done chan<- bool, wg *sync.WaitGroup, monitors []monitoring.Monitor) { for _, monitor := range monitors { wg.Add(1) go monitor.Monitor(ctx) } wg.Wait() done <- true }