cns/ipampool/v2/monitor.go (142 lines of code) (raw):

package v2 import ( "context" "math" "sync" "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" "github.com/pkg/errors" "go.uber.org/zap" ) const ( // DefaultMaxIPs default maximum allocatable IPs on a k8s Node. DefaultMaxIPs = 250 // fieldManager is the field manager used when patching the NodeNetworkConfig. fieldManager = "azure-cns" ) type nodeNetworkConfigSpecUpdater interface { PatchSpec(context.Context, *v1alpha.NodeNetworkConfigSpec, string) (*v1alpha.NodeNetworkConfig, error) } type ipStateStore interface { GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) } type scaler struct { batch int64 buffer float64 exhausted bool max int64 } type Monitor struct { z *zap.Logger scaler scaler nnccli nodeNetworkConfigSpecUpdater store ipStateStore demand int64 request int64 demandSource <-chan int cssSource <-chan v1alpha1.ClusterSubnetState nncSource <-chan v1alpha.NodeNetworkConfig started chan interface{} once sync.Once legacyMetricsObserver func(context.Context) error } func NewMonitor(z *zap.Logger, store ipStateStore, nnccli nodeNetworkConfigSpecUpdater, demandSource <-chan int, nncSource <-chan v1alpha.NodeNetworkConfig, cssSource <-chan v1alpha1.ClusterSubnetState) *Monitor { //nolint:lll // it's fine return &Monitor{ z: z.With(zap.String("component", "ipam-pool-monitor")), store: store, nnccli: nnccli, demandSource: demandSource, cssSource: cssSource, nncSource: nncSource, started: make(chan interface{}), legacyMetricsObserver: func(context.Context) error { return nil }, } } // Start begins the Monitor's pool reconcile loop. // On first run, it will block until a NodeNetworkConfig is received (through a call to Update()). // Subsequently, it will run run when Events happen or at least once per ReconcileDelay and attempt to re-reconcile the pool. func (pm *Monitor) Start(ctx context.Context) error { pm.z.Debug("starting") maxReconcileDelay := time.NewTicker(60 * time.Second) //nolint:gomnd // 60 seconds for { // proceed when things happen: select { case <-ctx.Done(): // calling context has closed, we'll exit. return errors.Wrap(ctx.Err(), "pool monitor context closed") case demand := <-pm.demandSource: // updated demand for IPs, recalculate request pm.demand = int64(demand) pm.z.Info("demand update", zap.Int64("demand", pm.demand)) case css := <-pm.cssSource: // received an updated ClusterSubnetState, recalculate request pm.scaler.exhausted = css.Status.Exhausted pm.z.Info("exhaustion update", zap.Bool("exhausted", pm.scaler.exhausted)) case nnc := <-pm.nncSource: // received a new NodeNetworkConfig, extract the data from it and recalculate request pm.scaler.max = int64(math.Min(float64(nnc.Status.Scaler.MaxIPCount), DefaultMaxIPs)) pm.scaler.batch = int64(math.Min(math.Max(float64(nnc.Status.Scaler.BatchSize), 1), float64(pm.scaler.max))) pm.scaler.buffer = math.Abs(float64(nnc.Status.Scaler.RequestThresholdPercent)) / 100 //nolint:gomnd // it's a percentage pm.once.Do(func() { pm.request = nnc.Spec.RequestedIPCount close(pm.started) // close the init channel the first time we fully receive a NodeNetworkConfig. pm.z.Debug("started", zap.Int64("initial request", pm.request)) }) pm.z.Info("scaler update", zap.Int64("batch", pm.scaler.batch), zap.Float64("buffer", pm.scaler.buffer), zap.Int64("max", pm.scaler.max), zap.Int64("request", pm.request)) case <-maxReconcileDelay.C: // try to reconcile the pool every maxReconcileDelay to prevent drift or lockups. } select { case <-pm.started: // this blocks until we have initialized default: // if we haven't started yet, we need to wait for the first NNC to be received. continue // jumps to the next iteration of the outer for-loop } // if control has flowed through the select(s) to this point, we can now reconcile. if err := pm.reconcile(ctx); err != nil { pm.z.Error("reconcile failed", zap.Error(err)) } if err := pm.legacyMetricsObserver(ctx); err != nil { pm.z.Error("legacy metrics observer failed", zap.Error(err)) } } } func (pm *Monitor) reconcile(ctx context.Context) error { // if the subnet is exhausted, locally overwrite the batch/minfree/maxfree in the meta copy for this iteration // (until the controlplane owns this and modifies the scaler values for us directly instead of writing "exhausted") // TODO(rbtr) s := pm.scaler if s.exhausted { s.batch = 1 s.buffer = 1 } // calculate the target state from the current pool state and scaler target := calculateTargetIPCountOrMax(pm.demand, s.batch, s.max, s.buffer) pm.z.Info("calculated new request", zap.Int64("demand", pm.demand), zap.Int64("batch", s.batch), zap.Int64("max", s.max), zap.Float64("buffer", s.buffer), zap.Int64("target", target)) delta := target - pm.request if delta == 0 { pm.z.Info("NNC already at target IPs, no scaling required") return nil } pm.z.Info("scaling pool", zap.Int64("delta", delta)) // try to release -delta IPs. this is no-op if delta is negative. if _, err := pm.store.MarkNIPsPendingRelease(int(-delta)); err != nil { return errors.Wrapf(err, "failed to mark sufficient IPs as PendingRelease, wanted %d", pm.request-target) } spec := pm.buildNNCSpec(target) if _, err := pm.nnccli.PatchSpec(ctx, &spec, fieldManager); err != nil { return errors.Wrap(err, "failed to UpdateSpec with NNC client") } pm.request = target pm.z.Info("scaled pool", zap.Int64("request", pm.request)) return nil } // buildNNCSpec translates CNS's map of IPs to be released and requested IP count into an NNC Spec. func (pm *Monitor) buildNNCSpec(request int64) v1alpha.NodeNetworkConfigSpec { // Get All Pending IPs from CNS and populate it again. pendingReleaseIPs := pm.store.GetPendingReleaseIPConfigs() spec := v1alpha.NodeNetworkConfigSpec{ RequestedIPCount: request, IPsNotInUse: make([]string, len(pendingReleaseIPs)), } for i := range pendingReleaseIPs { spec.IPsNotInUse[i] = pendingReleaseIPs[i].ID } return spec } func (pm *Monitor) WithLegacyMetricsObserver(observer func(context.Context) error) { pm.legacyMetricsObserver = observer } // calculateTargetIPCountOrMax calculates the target IP count request // using the scaling function and clamps the result at the max IPs. func calculateTargetIPCountOrMax(demand, batch, max int64, buffer float64) int64 { targetRequest := calculateTargetIPCount(demand, batch, buffer) if targetRequest > max { // clamp request at the max IPs targetRequest = max } return targetRequest } // calculateTargetIPCount calculates an IP count request based on the // current demand, batch size, and buffer. // ref: https://github.com/Azure/azure-container-networking/blob/master/docs/feature/ipammath/0-background.md // the idempotent scaling function is: // Target = Batch \times \lceil buffer + \frac{Demand}{Batch} \rceil func calculateTargetIPCount(demand, batch int64, buffer float64) int64 { return batch * int64(math.Ceil(buffer+float64(demand)/float64(batch))) }