banyand/measure/tstable.go (202 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "fmt" "io" "path" "sync" "sync/atomic" "time" "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/skl" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) const ( defaultNumBufferShards = 2 defaultWriteConcurrency = 1000 plain = "tst" encoded = "encoded" ) var _ tsdb.TSTable = (*tsTable)(nil) type tsTable struct { encoderSST kv.TimeSeriesStore sst kv.TimeSeriesStore *tsdb.BlockExpiryTracker l *logger.Logger encoderBuffer *tsdb.Buffer buffer *tsdb.Buffer closeBufferTimer *time.Timer position common.Position bufferSize int64 encoderBufferSize int64 lock sync.Mutex } func (t *tsTable) SizeOnDisk() int64 { return t.encoderSST.SizeOnDisk() } func (t *tsTable) openBuffer() (err error) { t.lock.Lock() defer t.lock.Unlock() if t.encoderBuffer != nil { return nil } bufferSize := int(t.encoderBufferSize / defaultNumBufferShards) if t.encoderBuffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize, defaultWriteConcurrency, defaultNumBufferShards, t.encoderFlush); err != nil { return fmt.Errorf("failed to create encoder buffer: %w", err) } bufferSize = int(t.bufferSize / defaultNumBufferShards) if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize, defaultWriteConcurrency, defaultNumBufferShards, t.flush); err != nil { return fmt.Errorf("failed to create buffer: %w", err) } end := t.EndTime() now := time.Now() closeAfter := end.Sub(now) if now.After(end) { closeAfter = t.BlockExpiryDuration() } t.closeBufferTimer = time.AfterFunc(closeAfter, func() { if t.l.Debug().Enabled() { t.l.Debug().Msg("closing buffer") } t.lock.Lock() defer t.lock.Unlock() if t.encoderBuffer != nil { if err := t.encoderBuffer.Close(); err != nil { t.l.Error().Err(err).Msg("close encoder buffer error") } t.encoderBuffer = nil } if t.buffer != nil { if err := t.buffer.Close(); err != nil { t.l.Error().Err(err).Msg("close buffer error") } t.buffer = nil } }) return nil } func (t *tsTable) Close() (err error) { t.lock.Lock() defer t.lock.Unlock() for _, b := range []io.Closer{t.encoderBuffer, t.buffer, t.sst, t.encoderSST} { if b != nil { err = multierr.Append(err, b.Close()) } } return err } func (t *tsTable) CollectStats() *badger.Statistics { mergedMap := &sync.Map{} for _, s := range []*badger.Statistics{t.encoderSST.CollectStats(), t.sst.CollectStats()} { if s != nil && s.TableBuilderSize != nil { s.TableBuilderSize.Range(func(key, value interface{}) bool { val := value.(*atomic.Int64) if val.Load() > 0 { mergedMap.Store(key, val) } return true }) } } return &badger.Statistics{ TableBuilderSize: mergedMap, } } func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { if t.toEncode(key) { if v, ok := t.encoderBuffer.Read(key, ts); ok { return v, nil } return t.encoderSST.Get(key, uint64(ts.UnixNano())) } if v, ok := t.buffer.Read(key, ts); ok { return v, nil } return t.sst.Get(key, uint64(ts.UnixNano())) } func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { if t.encoderBuffer != nil { t.writeToBuffer(key, val, ts) return nil } if err := t.openBuffer(); err != nil { return err } t.writeToBuffer(key, val, ts) return nil } func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) { if t.toEncode(key) { t.encoderBuffer.Write(key, val, ts) } else { t.buffer.Write(key, val, ts) } } func (t *tsTable) encoderFlush(shardIndex int, skl *skl.Skiplist) error { t.l.Info().Int("shard", shardIndex).Msg("flushing encoder buffer") return t.encoderSST.Handover(skl) } func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error { t.l.Info().Int("shard", shardIndex).Msg("flushing buffer") return t.sst.Handover(skl) } func (t *tsTable) toEncode(key []byte) bool { fieldSpec, _, err := pbv1.DecodeFieldFlag(key) if err != nil { t.l.Err(err).Msg("failed to decode field flag") } return fieldSpec.EncodingMethod == databasev1.EncodingMethod_ENCODING_METHOD_GORILLA } var _ tsdb.TSTableFactory = (*tsTableFactory)(nil) type tsTableFactory struct { encoderPool encoding.SeriesEncoderPool decoderPool encoding.SeriesDecoderPool bufferSize int64 encoderBufferSize int64 plainChunkSize int64 encodingChunkSize int compressionMethod databasev1.CompressionMethod } func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker tsdb.BlockExpiryTracker, root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) { encoderSST, err := kv.OpenTimeSeriesStore( path.Join(root, encoded), kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(encoded)), kv.TSSWithEncoding(ttf.encoderPool, ttf.decoderPool, ttf.encodingChunkSize), ) if err != nil { return nil, fmt.Errorf("failed to create time series table: %w", err) } sst, err := kv.OpenTimeSeriesStore( path.Join(root, plain), kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(plain)), kv.TSSWithZSTDCompression(int(ttf.plainChunkSize)), ) if err != nil { return nil, fmt.Errorf("failed to create time series table: %w", err) } table := &tsTable{ bufferSize: ttf.bufferSize, encoderBufferSize: ttf.encoderBufferSize, l: l, position: position, encoderSST: encoderSST, sst: sst, BlockExpiryTracker: &blockExpiryTracker, } if table.IsActive() { if err := table.openBuffer(); err != nil { return nil, fmt.Errorf("failed to open buffer: %w", err) } } return table, nil }