pkg/profiling/continuous/base/windows.go (202 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package base import ( "container/list" "sync" "time" ) // WindowData the slot data under TimeWindows type WindowData[D any, R any] interface { // Reset the data content Reset() // Accept add data Accept(data D) // Get calculate the result Get() R } // LatestWindowData only save the last data in one slot type LatestWindowData[D comparable] struct { Value D } func NewLatestWindowData[D comparable]() WindowData[D, D] { return &LatestWindowData[D]{} } func (t *LatestWindowData[D]) Reset() { var empty D t.Value = empty } func (t *LatestWindowData[D]) Accept(data D) { t.Value = data } func (t *LatestWindowData[D]) Get() D { return t.Value } type TimeWindows[V any, R any] struct { data *list.List endTime *time.Time windowLocker sync.RWMutex windowGenerator func() WindowData[V, R] // mark the latest flush endTime lastFlushTime *time.Time } func NewTimeWindows[V any, R any](items []*PolicyItem, generator func() WindowData[V, R]) *TimeWindows[V, R] { var maxPeriod int for _, i := range items { if i.Period > maxPeriod { maxPeriod = i.Period } } generatorWrapper := func() WindowData[V, R] { return newWindowDataWrapper[V, R](generator) } window := &TimeWindows[V, R]{ data: list.New(), windowGenerator: generatorWrapper, } for i := 0; i < maxPeriod; i++ { window.data.PushFront(generatorWrapper()) } return window } func (t *TimeWindows[V, R]) MatchRule(policy *PolicyItem, check func(slot R) bool) (lastMatch R, isMatch bool) { t.windowLocker.RLock() defer t.windowLocker.RUnlock() needsCount := policy.Count matchedCount := 0 for e := t.data.Back(); e != nil; e = e.Prev() { getVal := e.Value.(*windowDataWrapper[V, R]).Get() if check(getVal) { matchedCount++ lastMatch = getVal } } return lastMatch, matchedCount >= needsCount } func (t *TimeWindows[V, R]) ScalePeriod(items []*PolicyItem) { var maxPeriod int for _, i := range items { if i.Period > maxPeriod { maxPeriod = i.Period } } t.windowLocker.Lock() defer t.windowLocker.Unlock() if t.data.Len() == maxPeriod { return } val := maxPeriod - t.data.Len() if val > 0 { // need scale up for i := 0; i < val; i++ { t.data.PushBack(t.windowGenerator()) } } else { // need to scale down val = -val for i := 0; i < val; i++ { t.data.Remove(t.data.Back()) } } } func (t *TimeWindows[D, R]) Add(tm time.Time, val D) { if t.endTime == nil { t.endTime = &tm } second := int(t.endTime.Sub(tm).Seconds()) if second < 0 { t.moveTo(tm) second = 0 } if second >= t.data.Len() { // add the older data, ignore it return } t.appendDataToSlot(t.data.Len()-second-1, val) } func (t *TimeWindows[D, R]) FlushMostRecentData() (R, bool) { endTime := t.endTime if !t.shouldFlush(endTime) { var empty R return empty, false } t.lastFlushTime = endTime return t.data.Back().Value.(*windowDataWrapper[D, R]).Get(), true } func (t *TimeWindows[D, R]) FlushMultipleRecentData() ([]R, bool) { endTime := t.endTime if !t.shouldFlush(endTime) { return nil, false } result := make([]R, 0) slotCount := t.data.Len() if t.lastFlushTime != nil { slotCount = int(t.endTime.Sub(*t.lastFlushTime).Seconds()) - 1 } for e := t.data.Back(); e != nil && slotCount >= 0; e = e.Prev() { if e.Value.(*windowDataWrapper[D, R]).hasData { result = append(result, e.Value.(*windowDataWrapper[D, R]).Get()) } slotCount-- } t.lastFlushTime = endTime return result, true } func (t *TimeWindows[D, R]) shouldFlush(endTime *time.Time) bool { if endTime == nil { return false } if t.lastFlushTime == nil { return true } return t.lastFlushTime != endTime && t.lastFlushTime.Before(*endTime) } func (t *TimeWindows[D, R]) moveTo(tm time.Time) { t.windowLocker.Lock() defer t.windowLocker.Unlock() addSeconds := int(tm.Sub(*t.endTime).Seconds()) if addSeconds <= 0 { // same second or older return } else if addSeconds > t.data.Len() { // out of second count for e := t.data.Front(); e != nil; e = e.Next() { e.Value.(*windowDataWrapper[D, R]).Reset() } } else { for i := 0; i < addSeconds; i++ { // remove the older data first := t.data.Remove(t.data.Front()).(*windowDataWrapper[D, R]) first.Reset() t.data.PushBack(first) } } t.endTime = &tm } func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) { t.windowLocker.RLock() defer t.windowLocker.RUnlock() if index < 0 || index > t.data.Len() { return } dataLen := t.data.Len() var element *list.Element if index < (dataLen >> 1) { d := t.data.Front() for i := 0; i < index; i++ { d = d.Next() } element = d } else { d := t.data.Back() for i := dataLen - 1; i > index; i-- { d = d.Prev() } element = d } element.Value.(*windowDataWrapper[V, R]).Accept(data) } type windowDataWrapper[D any, R any] struct { WindowData[D, R] hasData bool } func newWindowDataWrapper[D any, R any](generator func() WindowData[D, R]) *windowDataWrapper[D, R] { return &windowDataWrapper[D, R]{ WindowData: generator(), hasData: false, } } func (t *windowDataWrapper[D, R]) Reset() { t.hasData = false t.WindowData.Reset() } func (t *windowDataWrapper[D, R]) Accept(data D) { t.hasData = true t.WindowData.Accept(data) }