banyand/tsdb/bucket/queue.go (235 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package bucket
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type (
// EvictFn is a closure executed on evicting an item.
EvictFn func(ctx context.Context, id interface{}) error
// OnAddRecentFn is a notifier on adding an item into the recent queue.
OnAddRecentFn func() error
)
// Queue is a LRU queue.
type Queue interface {
Touch(id fmt.Stringer) bool
Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
Remove(id fmt.Stringer)
Len() int
Volume() int
All() []fmt.Stringer
}
const (
// QueueName is identity of the queue.
QueueName = "block-queue-cleanup"
defaultRecentRatio = 0.25
defaultEvictBatchSize = 10
)
var errInvalidSize = errors.New("invalid size")
type lruQueue struct {
recent simplelru.LRUCache[fmt.Stringer, any]
frequent simplelru.LRUCache[fmt.Stringer, any]
recentEvict simplelru.LRUCache[fmt.Stringer, any]
l *logger.Logger
evictFn EvictFn
size int
recentSize int
evictSize int
lock sync.RWMutex
}
// NewQueue return a Queue for blocks eviction.
func NewQueue(l *logger.Logger, size int, maxSize int, scheduler *timestamp.Scheduler, evictFn EvictFn) (Queue, error) {
if size <= 0 {
return nil, errInvalidSize
}
recentSize := int(float64(size) * defaultRecentRatio)
evictSize := maxSize - size
recent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
if err != nil {
return nil, err
}
frequent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
if err != nil {
return nil, err
}
recentEvict, err := simplelru.NewLRU[fmt.Stringer, any](evictSize, nil)
if err != nil {
return nil, err
}
c := &lruQueue{
size: size,
recentSize: recentSize,
recent: recent,
frequent: frequent,
recentEvict: recentEvict,
evictSize: evictSize,
evictFn: evictFn,
l: l,
}
if err := scheduler.Register(QueueName, cron.Descriptor, "@every 5m", c.cleanEvict); err != nil {
return nil, err
}
return c, nil
}
func (q *lruQueue) Touch(id fmt.Stringer) bool {
q.lock.Lock()
defer q.lock.Unlock()
if q.frequent.Contains(id) {
if e := q.l.Debug(); e.Enabled() {
e.Stringer("id", id).Msg("get from frequent")
}
return true
}
if q.recent.Contains(id) {
if e := q.l.Debug(); e.Enabled() {
e.Stringer("id", id).Msg("promote from recent to frequent")
}
q.recent.Remove(id)
q.frequent.Add(id, nil)
return true
}
return false
}
func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error {
q.lock.Lock()
defer q.lock.Unlock()
if q.frequent.Contains(id) {
if e := q.l.Debug(); e.Enabled() {
e.Stringer("id", id).Msg("push to frequent")
}
q.frequent.Add(id, nil)
return nil
}
if q.recent.Contains(id) {
if e := q.l.Debug(); e.Enabled() {
e.Stringer("id", id).Msg("promote from recent to frequent")
}
q.recent.Remove(id)
q.frequent.Add(id, nil)
return nil
}
if q.recentEvict.Contains(id) {
if e := q.l.Debug(); e.Enabled() {
e.Stringer("id", id).Msg("restore from recentEvict")
}
if err := q.ensureSpace(ctx, true); err != nil {
return err
}
q.recentEvict.Remove(id)
q.frequent.Add(id, nil)
return nil
}
if err := q.ensureSpace(ctx, false); err != nil {
return err
}
q.recent.Add(id, nil)
if fn == nil {
return nil
}
return fn()
}
func (q *lruQueue) Remove(id fmt.Stringer) {
q.lock.Lock()
defer q.lock.Unlock()
if q.frequent.Contains(id) {
q.frequent.Remove(id)
return
}
if q.recent.Contains(id) {
q.recent.Remove(id)
return
}
if q.recentEvict.Contains(id) {
q.recentEvict.Remove(id)
}
}
func (q *lruQueue) Len() int {
q.lock.RLock()
defer q.lock.RUnlock()
return q.recent.Len() + q.frequent.Len()
}
func (q *lruQueue) Volume() int {
return q.size + q.recentSize + q.evictSize
}
func (q *lruQueue) All() []fmt.Stringer {
q.lock.RLock()
defer q.lock.RUnlock()
all := make([]fmt.Stringer, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
copy(all, q.recent.Keys())
copy(all[q.recent.Len():], q.frequent.Keys())
copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
return all
}
func (q *lruQueue) evictLen() int {
q.lock.RLock()
defer q.lock.RUnlock()
return q.recentEvict.Len()
}
func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
recentLen := q.recent.Len()
freqLen := q.frequent.Len()
if recentLen+freqLen < q.size {
return nil
}
if recentLen > 0 && (recentLen > q.recentSize || (recentLen == q.recentSize && !recentEvict)) {
k, _, ok := q.recent.GetOldest()
if !ok {
return errors.New("failed to get oldest from recent queue")
}
if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != nil {
return err
}
q.recent.Remove(k)
return nil
}
return q.removeOldest(ctx, q.frequent)
}
func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache[fmt.Stringer, any], size int, id fmt.Stringer) error {
if lst.Len() < size {
lst.Add(id, nil)
return nil
}
if err := q.removeOldest(ctx, lst); err != nil {
return err
}
lst.Add(id, nil)
return nil
}
func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache[fmt.Stringer, any]) error {
oldestID, _, ok := lst.GetOldest()
if ok && q.evictFn != nil {
if err := q.evictFn(ctx, oldestID); err != nil {
return err
}
_ = lst.Remove(oldestID)
}
return nil
}
func (q *lruQueue) cleanEvict(now time.Time, l *logger.Logger) bool {
if e := l.Debug(); e.Enabled() {
e.Time("now", now).Msg("block queue wakes")
}
if q.evictLen() < 1 {
return true
}
for i := 0; i < defaultEvictBatchSize; i++ {
if q.remove() {
break
}
}
return true
}
func (q *lruQueue) remove() bool {
q.lock.Lock()
defer q.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := q.removeOldest(ctx, q.recentEvict); err != nil {
q.l.Error().Err(err).Msg("failed to remove oldest blocks")
}
if q.recentEvict.Len() < 1 {
return true
}
return false
}