pkg/flow/streaming/sliding_window.go (266 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package streaming
import (
"container/heap"
"context"
"math"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
type triggerResult bool
const (
fire triggerResult = true
cont = false
)
var (
_ flow.Operator = (*tumblingTimeWindows)(nil)
_ flow.WindowAssigner = (*tumblingTimeWindows)(nil)
_ flow.Window = (*timeWindow)(nil)
defaultCacheSize = 2
)
func (f *streamingFlow) Window(w flow.WindowAssigner) flow.WindowedFlow {
switch v := w.(type) {
case *tumblingTimeWindows:
v.errorHandler = f.drainErr
v.l = f.l
f.ops = append(f.ops, v)
default:
f.drainErr(errors.New("window type is not supported"))
}
return &windowedFlow{
f: f,
wa: w,
l: f.l,
}
}
func (s *windowedFlow) AllowedMaxWindows(windowCnt int) flow.WindowedFlow {
switch v := s.wa.(type) {
case *tumblingTimeWindows:
v.windowCount = windowCnt
default:
s.f.drainErr(errors.New("windowCnt is not supported"))
}
return s
}
type tumblingTimeWindows struct {
l *logger.Logger
snapshots *lru.Cache
timerHeap *flow.DedupPriorityQueue
aggregationFactory flow.AggregationOpFactory
in chan flow.StreamRecord
out chan flow.StreamRecord
errorHandler func(error)
flow.ComponentState
windowCount int
currentWatermark int64
lastFlushTime int64
flushInterval int64
windowSize int64
timerMu sync.Mutex
}
func (s *tumblingTimeWindows) In() chan<- flow.StreamRecord {
return s.in
}
func (s *tumblingTimeWindows) Out() <-chan flow.StreamRecord {
return s.out
}
func (s *tumblingTimeWindows) Setup(_ context.Context) (err error) {
if s.snapshots == nil {
if s.windowCount <= 0 {
s.windowCount = defaultCacheSize
}
s.snapshots, err = lru.NewWithEvict(s.windowCount, func(key interface{}, value interface{}) {
flushed := s.flushSnapshot(key.(timeWindow), value.(flow.AggregationOp))
if e := s.l.Debug(); e.Enabled() {
e.Stringer("window", key.(timeWindow)).Bool("flushed", flushed).Msg("evict window on lru cache is full")
}
})
if err != nil {
return err
}
}
// start processing
s.Add(1)
go s.receive()
return
}
func (s *tumblingTimeWindows) flushSnapshot(w timeWindow, snapshot flow.AggregationOp) bool {
if snapshot.Dirty() {
s.out <- flow.NewStreamRecord(snapshot.Snapshot(), w.start)
return true
}
return false
}
func (s *tumblingTimeWindows) flushWindow(w timeWindow) {
if snapshot, ok := s.snapshots.Get(w); ok {
flushed := s.flushSnapshot(w, snapshot.(flow.AggregationOp))
if e := s.l.Debug(); e.Enabled() {
e.Stringer("window", w).Bool("flushed", flushed).Msg("flush window")
}
}
}
func (s *tumblingTimeWindows) flushDueWindows() {
s.timerMu.Lock()
defer s.timerMu.Unlock()
for {
if lookAhead, ok := s.timerHeap.Peek().(*internalTimer); ok {
if lookAhead.triggerTimeMillis <= s.currentWatermark {
oldestTimer := heap.Pop(s.timerHeap).(*internalTimer)
s.flushWindow(oldestTimer.w)
continue
}
}
return
}
}
func (s *tumblingTimeWindows) flushDirtyWindows() {
for _, key := range s.snapshots.Keys() {
s.flushWindow(key.(timeWindow))
}
}
func (s *tumblingTimeWindows) receive() {
defer s.Done()
for elem := range s.in {
assignedWindows, err := s.AssignWindows(elem.TimestampMillis())
if err != nil {
s.errorHandler(err)
continue
}
ctx := triggerContext{
delegation: s,
}
for _, w := range assignedWindows {
// drop if the window is late
if s.isWindowLate(w) {
continue
}
tw := w.(timeWindow)
ctx.window = tw
// add elem to the bucket
if oldAggr, ok := s.snapshots.Get(tw); ok {
oldAggr.(flow.AggregationOp).Add([]flow.StreamRecord{elem})
} else {
newAggr := s.aggregationFactory()
newAggr.Add([]flow.StreamRecord{elem})
s.snapshots.Add(tw, newAggr)
if e := s.l.Debug(); e.Enabled() {
e.Stringer("window", tw).Msg("create new window")
}
}
result := ctx.OnElement(elem)
if result == fire {
s.flushWindow(tw)
}
}
// even if the incoming elements do not follow strict order,
// the watermark could increase monotonically.
now := time.Now().UnixNano() / int64(time.Millisecond)
pastDataDur := elem.TimestampMillis() - s.currentWatermark
if s.lastFlushTime == 0 {
s.lastFlushTime = now
}
pastDur := now - s.lastFlushTime
if pastDur > 0 || pastDataDur > 0 {
previousWaterMark := s.currentWatermark
s.currentWatermark = elem.TimestampMillis()
// Currently, assume the current watermark is t,
// then we allow lateness items by not purging the window
// of which the flush trigger time is less and equal than t,
// i.e. triggerTime <= t
s.flushDueWindows()
// flush dirty windows if the necessary
// use 40% of the data point interval as the flush interval,
// which means roughly the record located in the same time bucket will be persistent twice.
// |---------------------------------|
// | 40% | 40% | 20% |
// | flush flush |
// |---------------------------------|
// the max flush interval is 1 minute.
if (pastDur > s.flushInterval) || (previousWaterMark > 0 && pastDataDur > s.flushInterval) {
s.lastFlushTime = now
s.flushDirtyWindows()
}
}
}
close(s.out)
}
// isWindowLate checks whether this window is valid. The window is late if and only if
// it meets all the following conditions,
// 1) the max timestamp is before the current watermark
// 2) the LRU cache is full
// 3) the LRU cache does not contain the window entry.
func (s *tumblingTimeWindows) isWindowLate(w flow.Window) bool {
return w.MaxTimestamp() <= s.currentWatermark && s.snapshots.Len() >= s.windowCount && !s.snapshots.Contains(w)
}
func (s *tumblingTimeWindows) Teardown(_ context.Context) error {
s.Wait()
return nil
}
func (s *tumblingTimeWindows) Exec(downstream flow.Inlet) {
s.Add(1)
go flow.Transmit(&s.ComponentState, downstream, s)
}
// NewTumblingTimeWindows return tumbling-time windows.
func NewTumblingTimeWindows(size time.Duration, maxFlushInterval time.Duration) flow.WindowAssigner {
ws := size.Milliseconds()
mfi := maxFlushInterval.Milliseconds()
it := int64(float64(ws) * 0.4)
if it > mfi {
it = mfi
}
return &tumblingTimeWindows{
windowSize: ws,
timerHeap: flow.NewPriorityQueue(func(a, b interface{}) int {
return int(a.(*internalTimer).triggerTimeMillis - b.(*internalTimer).triggerTimeMillis)
}, false),
in: make(chan flow.StreamRecord),
out: make(chan flow.StreamRecord),
currentWatermark: 0,
flushInterval: it,
}
}
type timeWindow struct {
start int64
end int64
}
func (t timeWindow) MaxTimestamp() int64 {
return t.end - 1
}
func (t timeWindow) String() string {
st := time.Unix(0, t.start*int64(time.Millisecond)).String()
et := time.Unix(0, t.end*int64(time.Millisecond)).String()
return st + " - " + et
}
// AssignWindows assigns windows according to the given timestamp.
func (s *tumblingTimeWindows) AssignWindows(timestamp int64) ([]flow.Window, error) {
if timestamp > math.MinInt64 {
start := getWindowStart(timestamp, s.windowSize)
return []flow.Window{
timeWindow{
start: start,
end: start + s.windowSize,
},
}, nil
}
return nil, errors.New("invalid timestamp from the element")
}
// getWindowStart calculates the window start for a timestamp.
func getWindowStart(timestamp, windowSize int64) int64 {
remainder := timestamp % windowSize
return timestamp - remainder
}
// eventTimeTriggerOnElement processes element(s) with EventTimeTrigger.
func eventTimeTriggerOnElement(window timeWindow, ctx *triggerContext) triggerResult {
if window.MaxTimestamp() <= ctx.GetCurrentWatermark() {
// if watermark is already past the window fire immediately
return fire
}
ctx.RegisterEventTimeTimer(window.MaxTimestamp())
return cont
}
type triggerContext struct {
delegation *tumblingTimeWindows
window timeWindow
}
func (ctx *triggerContext) GetCurrentWatermark() int64 {
return ctx.delegation.currentWatermark
}
func (ctx *triggerContext) RegisterEventTimeTimer(triggerTime int64) {
ctx.delegation.timerMu.Lock()
defer ctx.delegation.timerMu.Unlock()
heap.Push(ctx.delegation.timerHeap, &internalTimer{
triggerTimeMillis: triggerTime,
w: ctx.window,
})
}
func (ctx *triggerContext) OnElement(_ flow.StreamRecord) triggerResult {
return eventTimeTriggerOnElement(ctx.window, ctx)
}
var _ flow.Element = (*internalTimer)(nil)
type internalTimer struct {
w timeWindow
triggerTimeMillis int64
index int
}
func (t *internalTimer) GetIndex() int {
return t.index
}
func (t *internalTimer) SetIndex(idx int) {
t.index = idx
}