banyand/measure/tstable.go (371 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"container/heap"
"encoding/json"
"errors"
"fmt"
"io"
"path/filepath"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
const (
snapshotSuffix = ".snp"
)
func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
l *logger.Logger, _ timestamp.TimeRange, option option, m any,
) (*tsTable, error) {
tst := tsTable{
fileSystem: fileSystem,
root: rootPath,
option: option,
l: l,
p: p,
}
if m != nil {
tst.metrics = m.(*metrics)
}
tst.gc.init(&tst)
ee := fileSystem.ReadDir(rootPath)
if len(ee) == 0 {
t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
var loadedParts []uint64
var loadedSnapshots []uint64
var needToDelete []string
for i := range ee {
if ee[i].IsDir() {
p, err := parseEpoch(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse part file name. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
err = validatePartMetadata(fileSystem, filepath.Join(rootPath, ee[i].Name()))
if err != nil {
l.Info().Err(err).Msg("cannot validate part metadata. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
loadedParts = append(loadedParts, p)
continue
}
if filepath.Ext(ee[i].Name()) != snapshotSuffix {
continue
}
snapshot, err := parseSnapshot(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse snapshot file name. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
loadedSnapshots = append(loadedSnapshots, snapshot)
}
for i := range needToDelete {
l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file")
fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
return loadedSnapshots[i] > loadedSnapshots[j]
})
epoch := loadedSnapshots[0]
t := &tst
t.loadSnapshot(epoch, loadedParts)
t.startLoop(epoch)
return t, nil
}
type tsTable struct {
fileSystem fs.FileSystem
l *logger.Logger
snapshot *snapshot
introductions chan *introduction
loopCloser *run.Closer
*metrics
p common.Position
option option
root string
gc garbageCleaner
curPartID uint64
sync.RWMutex
}
func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
parts := tst.mustReadSnapshot(epoch)
snp := snapshot{
epoch: epoch,
}
needToPersist := false
for _, id := range loadedParts {
var find bool
for j := range parts {
if id == parts[j] {
find = true
break
}
}
if !find {
tst.gc.removePart(id)
continue
}
err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id))
if err != nil {
tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it")
tst.gc.removePart(id)
needToPersist = true
continue
}
p := mustOpenFilePart(id, tst.root, tst.fileSystem)
p.partMetadata.ID = id
snp.parts = append(snp.parts, newPartWrapper(nil, p))
if tst.curPartID < id {
tst.curPartID = id
}
}
tst.gc.registerSnapshot(&snp)
tst.gc.clean()
if len(snp.parts) < 1 {
return
}
snp.incRef()
tst.snapshot = &snp
if needToPersist {
tst.persistSnapshot(&snp)
}
}
func (tst *tsTable) startLoop(cur uint64) {
tst.loopCloser = run.NewCloser(1 + 3)
tst.introductions = make(chan *introduction)
flushCh := make(chan *flusherIntroduction)
mergeCh := make(chan *mergerIntroduction)
introducerWatcher := make(watcher.Channel, 1)
flusherWatcher := make(watcher.Channel, 1)
go tst.introducerLoop(flushCh, mergeCh, introducerWatcher, cur+1)
go tst.flusherLoop(flushCh, mergeCh, introducerWatcher, flusherWatcher, cur)
go tst.mergeLoop(mergeCh, flusherWatcher)
}
func parseEpoch(epochStr string) (uint64, error) {
p, err := strconv.ParseUint(epochStr, 16, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse path %s: %w", epochStr, err)
}
return p, nil
}
func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) {
data, err := json.Marshal(partNames)
if err != nil {
logger.Panicf("cannot marshal partNames to JSON: %s", err)
}
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
lf, err := tst.fileSystem.CreateLockFile(snapshotPath, storage.FilePerm)
if err != nil {
logger.Panicf("cannot create lock file %s: %s", snapshotPath, err)
}
n, err := lf.Write(data)
if err != nil {
logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
}
if n != len(data) {
logger.Panicf("unexpected number of bytes written to %s; got %d; want %d", snapshotPath, n, len(data))
}
}
func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
data, err := tst.fileSystem.Read(snapshotPath)
if err != nil {
logger.Panicf("cannot read %s: %s", snapshotPath, err)
}
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
logger.Panicf("cannot parse %s: %s", snapshotPath, err)
}
var result []uint64
for i := range partNames {
e, err := parseEpoch(partNames[i])
if err != nil {
logger.Panicf("cannot parse %s: %s", partNames[i], err)
}
result = append(result, e)
}
return result
}
func (tst *tsTable) Close() error {
if tst.loopCloser != nil {
tst.loopCloser.Done()
tst.loopCloser.CloseThenWait()
}
tst.Lock()
defer tst.Unlock()
tst.deleteMetrics()
if tst.snapshot == nil {
return nil
}
tst.snapshot.decRef()
tst.snapshot = nil
return nil
}
func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
if len(dps.seriesIDs) == 0 {
return
}
mp := generateMemPart()
mp.mustInitFromDataPoints(dps)
p := openMemPart(mp)
ind := generateIntroduction()
defer releaseIntroduction(ind)
ind.applied = make(chan struct{})
ind.memPart = newPartWrapper(mp, p)
ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
startTime := time.Now()
select {
case tst.introductions <- ind:
case <-tst.loopCloser.CloseNotify():
return
}
select {
case <-ind.applied:
case <-tst.loopCloser.CloseNotify():
}
tst.incTotalWritten(len(dps.timestamps))
tst.incTotalBatch(1)
tst.incTotalBatchIntroLatency(time.Since(startTime).Seconds())
}
type tstIter struct {
err error
parts []*part
piPool []partIter
piHeap partIterHeap
nextBlockNoop bool
}
func (ti *tstIter) reset() {
for i := range ti.parts {
ti.parts[i] = nil
}
ti.parts = ti.parts[:0]
for i := range ti.piPool {
ti.piPool[i].reset()
}
ti.piPool = ti.piPool[:0]
for i := range ti.piHeap {
ti.piHeap[i] = nil
}
ti.piHeap = ti.piHeap[:0]
ti.err = nil
ti.nextBlockNoop = false
}
func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) {
ti.reset()
ti.parts = parts
if n := len(ti.parts) - cap(ti.piPool); n > 0 {
ti.piPool = append(ti.piPool[:cap(ti.piPool)], make([]partIter, n)...)
}
ti.piPool = ti.piPool[:len(ti.parts)]
for i, p := range ti.parts {
ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp)
}
ti.piHeap = ti.piHeap[:0]
for i := range ti.piPool {
ps := &ti.piPool[i]
if !ps.nextBlock() {
if err := ps.error(); err != nil {
ti.err = fmt.Errorf("cannot initialize tsTable iteration: %w", err)
return
}
continue
}
ti.piHeap = append(ti.piHeap, ps)
}
if len(ti.piHeap) == 0 {
ti.err = io.EOF
return
}
heap.Init(&ti.piHeap)
ti.nextBlockNoop = true
}
func (ti *tstIter) nextBlock() bool {
if ti.err != nil {
return false
}
if ti.nextBlockNoop {
ti.nextBlockNoop = false
return true
}
ti.err = ti.next()
if ti.err != nil {
if errors.Is(ti.err, io.EOF) {
ti.err = fmt.Errorf("cannot obtain the next block to search in the partition: %w", ti.err)
}
return false
}
return true
}
func (ti *tstIter) next() error {
psMin := ti.piHeap[0]
if psMin.nextBlock() {
heap.Fix(&ti.piHeap, 0)
return nil
}
if err := psMin.error(); err != nil {
return err
}
heap.Pop(&ti.piHeap)
if len(ti.piHeap) == 0 {
return io.EOF
}
return nil
}
func (ti *tstIter) Error() error {
if errors.Is(ti.err, io.EOF) {
return nil
}
return ti.err
}
func generateTstIter() *tstIter {
v := tstIterPool.Get()
if v == nil {
return &tstIter{}
}
return v
}
func releaseTstIter(ti *tstIter) {
ti.reset()
tstIterPool.Put(ti)
}
var tstIterPool = pool.Register[*tstIter]("measure-tstIter")
type partIterHeap []*partIter
func (pih *partIterHeap) Len() int {
return len(*pih)
}
func (pih *partIterHeap) Less(i, j int) bool {
x := *pih
return x[i].curBlock.less(x[j].curBlock)
}
func (pih *partIterHeap) Swap(i, j int) {
x := *pih
x[i], x[j] = x[j], x[i]
}
func (pih *partIterHeap) Push(x any) {
*pih = append(*pih, x.(*partIter))
}
func (pih *partIterHeap) Pop() any {
a := *pih
v := a[len(a)-1]
*pih = a[:len(a)-1]
return v
}