banyand/measure/introducer.go (172 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
type introduction struct {
memPart *partWrapper
applied chan struct{}
}
func (i *introduction) reset() {
i.memPart = nil
i.applied = nil
}
var introductionPool = pool.Register[*introduction]("measure-introduction")
func generateIntroduction() *introduction {
v := introductionPool.Get()
if v == nil {
return &introduction{}
}
i := v
i.reset()
return i
}
func releaseIntroduction(i *introduction) {
introductionPool.Put(i)
}
type flusherIntroduction struct {
flushed map[uint64]*partWrapper
applied chan struct{}
}
func (i *flusherIntroduction) reset() {
for k := range i.flushed {
delete(i.flushed, k)
}
i.applied = nil
}
var flusherIntroductionPool = pool.Register[*flusherIntroduction]("measure-flusher-introduction")
func generateFlusherIntroduction() *flusherIntroduction {
v := flusherIntroductionPool.Get()
if v == nil {
return &flusherIntroduction{
flushed: make(map[uint64]*partWrapper),
}
}
i := v
i.reset()
return i
}
func releaseFlusherIntroduction(i *flusherIntroduction) {
flusherIntroductionPool.Put(i)
}
type mergerIntroduction struct {
merged map[uint64]struct{}
newPart *partWrapper
applied chan struct{}
creator snapshotCreator
}
func (i *mergerIntroduction) reset() {
for k := range i.merged {
delete(i.merged, k)
}
i.newPart = nil
i.applied = nil
i.creator = 0
}
var mergerIntroductionPool = pool.Register[*mergerIntroduction]("measure-merger-introduction")
func generateMergerIntroduction() *mergerIntroduction {
v := mergerIntroductionPool.Get()
if v == nil {
return &mergerIntroduction{}
}
i := v
i.reset()
return i
}
func releaseMergerIntroduction(i *mergerIntroduction) {
mergerIntroductionPool.Put(i)
}
func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, mergeCh chan *mergerIntroduction, watcherCh watcher.Channel, epoch uint64) {
var introducerWatchers watcher.Epochs
defer tst.loopCloser.Done()
for {
select {
case <-tst.loopCloser.CloseNotify():
return
case next := <-tst.introductions:
tst.incTotalIntroduceLoopStarted(1, "mem")
tst.introduceMemPart(next, epoch)
tst.incTotalIntroduceLoopFinished(1, "mem")
epoch++
case next := <-flushCh:
tst.incTotalIntroduceLoopStarted(1, "flush")
tst.introduceFlushed(next, epoch)
tst.incTotalIntroduceLoopFinished(1, "flush")
tst.gc.clean()
epoch++
case next := <-mergeCh:
tst.incTotalIntroduceLoopStarted(1, "merge")
tst.introduceMerged(next, epoch)
tst.incTotalIntroduceLoopFinished(1, "merge")
tst.gc.clean()
epoch++
case epochWatcher := <-watcherCh:
introducerWatchers.Add(epochWatcher)
}
curEpoch := tst.currentEpoch()
introducerWatchers.Notify(curEpoch)
}
}
func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch uint64) {
cur := tst.currentSnapshot()
if cur != nil {
defer cur.decRef()
} else {
cur = new(snapshot)
}
next := nextIntroduction.memPart
nextSnp := cur.copyAllTo(epoch)
nextSnp.parts = append(nextSnp.parts, next)
nextSnp.creator = snapshotCreatorMemPart
tst.replaceSnapshot(&nextSnp, false)
if nextIntroduction.applied != nil {
close(nextIntroduction.applied)
}
}
func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, epoch uint64) {
cur := tst.currentSnapshot()
if cur == nil {
tst.l.Panic().Msg("current snapshot is nil")
}
defer cur.decRef()
nextSnp := cur.merge(epoch, nextIntroduction.flushed)
nextSnp.creator = snapshotCreatorFlusher
tst.replaceSnapshot(&nextSnp, true)
if nextIntroduction.applied != nil {
close(nextIntroduction.applied)
}
}
func (tst *tsTable) introduceMerged(nextIntroduction *mergerIntroduction, epoch uint64) {
cur := tst.currentSnapshot()
if cur == nil {
tst.l.Panic().Msg("current snapshot is nil")
return
}
defer cur.decRef()
nextSnp := cur.remove(epoch, nextIntroduction.merged)
nextSnp.parts = append(nextSnp.parts, nextIntroduction.newPart)
nextSnp.creator = nextIntroduction.creator
tst.replaceSnapshot(&nextSnp, true)
if nextIntroduction.applied != nil {
close(nextIntroduction.applied)
}
}
func (tst *tsTable) replaceSnapshot(next *snapshot, persisted bool) {
tst.Lock()
defer tst.Unlock()
if tst.snapshot != nil {
tst.snapshot.decRef()
}
tst.snapshot = next
if persisted {
tst.persistSnapshot(next)
}
}
func (tst *tsTable) currentEpoch() uint64 {
s := tst.currentSnapshot()
if s == nil {
return 0
}
defer s.decRef()
return s.epoch
}