banyand/stream/tstable.go (125 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package stream import ( "fmt" "path" "sync" "time" "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/skl" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/logger" ) const ( defaultNumBufferShards = 2 defaultWriteConcurrency = 1000 id = "tst" ) var _ tsdb.TSTable = (*tsTable)(nil) type tsTable struct { sst kv.TimeSeriesStore *tsdb.BlockExpiryTracker l *logger.Logger buffer *tsdb.Buffer closeBufferTimer *time.Timer position common.Position bufferSize int64 lock sync.Mutex } func (t *tsTable) SizeOnDisk() int64 { return t.sst.SizeOnDisk() } func (t *tsTable) openBuffer() (err error) { t.lock.Lock() defer t.lock.Unlock() if t.buffer != nil { return nil } bufferSize := int(t.bufferSize / defaultNumBufferShards) if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize, defaultWriteConcurrency, defaultNumBufferShards, t.flush); err != nil { return fmt.Errorf("failed to create buffer: %w", err) } end := t.EndTime() now := time.Now() closeAfter := end.Sub(now) if now.After(end) { closeAfter = t.BlockExpiryDuration() } t.closeBufferTimer = time.AfterFunc(closeAfter, func() { if t.l.Debug().Enabled() { t.l.Debug().Msg("closing buffer") } t.lock.Lock() defer t.lock.Unlock() if t.buffer == nil { return } if err := t.buffer.Close(); err != nil { t.l.Error().Err(err).Msg("close buffer error") } t.buffer = nil }) return nil } func (t *tsTable) Close() (err error) { t.lock.Lock() defer t.lock.Unlock() if t.buffer != nil { err = multierr.Append(err, t.buffer.Close()) } return multierr.Combine(err, t.sst.Close()) } func (t *tsTable) CollectStats() *badger.Statistics { return t.sst.CollectStats() } func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { if v, ok := t.buffer.Read(key, ts); ok { return v, nil } return t.sst.Get(key, uint64(ts.UnixNano())) } func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { if t.buffer != nil { t.buffer.Write(key, val, ts) return nil } if err := t.openBuffer(); err != nil { return err } t.buffer.Write(key, val, ts) return nil } func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error { t.l.Info().Int("shard", shardIndex).Msg("flushing buffer") return t.sst.Handover(skl) } var _ tsdb.TSTableFactory = (*tsTableFactory)(nil) type tsTableFactory struct { bufferSize int64 compressionMethod databasev1.CompressionMethod chunkSize int } func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker tsdb.BlockExpiryTracker, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) { sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)), kv.TSSWithZSTDCompression(ttf.chunkSize)) if err != nil { return nil, fmt.Errorf("failed to create time series table: %w", err) } table := &tsTable{ bufferSize: ttf.bufferSize, l: l, position: position, sst: sst, BlockExpiryTracker: &blockExpiryTracker, } if table.IsActive() { if err := table.openBuffer(); err != nil { return nil, fmt.Errorf("failed to open buffer: %w", err) } } return table, nil }