banyand/tsdb/bucket/strategy.go (137 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package bucket import ( "fmt" "math" "sync/atomic" "github.com/pkg/errors" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) var ( // ErrInvalidParameter denotes input parameters are invalid. ErrInvalidParameter = errors.New("parameters are invalid") // ErrNoMoreBucket denotes the bucket volume reaches the limitation. ErrNoMoreBucket = errors.New("no more buckets") ) type ratio float64 // Strategy controls Reporters with Controller's help. type Strategy struct { optionsErr error ctrl Controller current atomic.Value logger *logger.Logger closer *run.Closer ratio ratio currentRatio uint64 } // StrategyOptions sets how to create a Strategy. type StrategyOptions func(*Strategy) // WithNextThreshold sets a ratio to creat the next Reporter. func WithNextThreshold(r ratio) StrategyOptions { return func(s *Strategy) { if r > 1.0 { s.optionsErr = multierr.Append(s.optionsErr, errors.Wrapf(ErrInvalidParameter, "ratio %v is more than 1.0", r)) return } s.ratio = r } } // WithLogger sets a logger.Logger. func WithLogger(logger *logger.Logger) StrategyOptions { return func(s *Strategy) { s.logger = logger } } // NewStrategy returns a Strategy. func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, error) { if ctrl == nil { return nil, errors.Wrap(ErrInvalidParameter, "controller is absent") } strategy := &Strategy{ ctrl: ctrl, ratio: 0.8, closer: run.NewCloser(1), } for _, opt := range options { opt(strategy) } if strategy.optionsErr != nil { return nil, strategy.optionsErr } if strategy.logger == nil { strategy.logger = logger.GetLogger("bucket-strategy") } if err := strategy.resetCurrent(); err != nil { return nil, err } return strategy, nil } func (s *Strategy) resetCurrent() error { c, err := s.ctrl.Current() if err != nil { return err } s.current.Store(c) return nil } // Run the Strategy in the background. func (s *Strategy) Run() { go func(s *Strategy) { defer s.closer.Done() for { c, err := s.current.Load().(Reporter).Report() if errors.Is(err, errReporterClosed) { return } if err != nil { s.logger.Error().Err(err).Msg("failed to get reporter") if err := s.resetCurrent(); err != nil { panic(err) } continue } if !s.observe(c) { return } } }(s) } func (s *Strategy) String() string { c := s.current.Load() if c == nil { return "nil" } return fmt.Sprintf("%s:%f", c.(Reporter).String(), math.Float64frombits(atomic.LoadUint64(&s.currentRatio))) } func (s *Strategy) observe(c Channel) bool { var next Reporter moreBucket := true for { select { case status, more := <-c: if !more { return moreBucket } r := ratio(status.Volume) / ratio(status.Capacity) atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(r))) if r >= s.ratio && next == nil && moreBucket { n, err := s.ctrl.Next() if errors.Is(err, ErrNoMoreBucket) { moreBucket = false } else if err != nil { s.logger.Err(err).Msg("failed to create the next bucket") } else { s.logger.Info().Stringer("next", n).Msg("created the next bucket") next = n } } if r >= 1.0 { s.ctrl.OnMove(s.current.Load().(Reporter), next) if next != nil { s.current.Store(next) } return moreBucket } case <-s.closer.CloseNotify(): return false } } } // Close the Strategy running in the background. func (s *Strategy) Close() { s.closer.CloseThenWait() }