banyand/measure/flusher.go (156 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"errors"
"math"
"time"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, mergeCh chan *mergerIntroduction, introducerWatcher, flusherWatcher watcher.Channel, epoch uint64) {
defer tst.loopCloser.Done()
epochWatcher := introducerWatcher.Add(epoch, tst.loopCloser.CloseNotify())
if epochWatcher == nil {
return
}
var flusherWatchers watcher.Epochs
for {
select {
case <-tst.loopCloser.CloseNotify():
return
case e := <-flusherWatcher:
flusherWatchers.Add(e)
case <-epochWatcher.Watch():
if func() bool {
tst.incTotalFlushLoopStarted(1)
start := time.Now()
defer func() {
tst.incTotalFlushLoopFinished(1)
tst.incTotalFlushLatency(time.Since(start).Seconds())
}()
curSnapshot := tst.currentSnapshot()
if curSnapshot != nil {
flusherWatchers = tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
curSnapshot.decRef()
curSnapshot = nil
}
tst.RLock()
if tst.snapshot != nil && tst.snapshot.epoch > epoch {
curSnapshot = tst.snapshot
curSnapshot.incRef()
}
tst.RUnlock()
if curSnapshot != nil {
defer curSnapshot.decRef()
merged, err := tst.mergeMemParts(curSnapshot, mergeCh)
if err != nil {
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", curSnapshot.epoch)
tst.incTotalFlushLoopErr(1)
return false
}
if !merged {
tst.flush(curSnapshot, flushCh)
}
epoch = curSnapshot.epoch
// Notify merger to start a new round of merge.
// This round might have be triggered in pauseFlusherToPileupMemParts.
flusherWatchers.Notify(math.MaxUint64)
flusherWatchers = nil
if tst.currentEpoch() != epoch {
tst.incTotalFlushLoopProgress(1)
return false
}
}
epochWatcher = introducerWatcher.Add(epoch, tst.loopCloser.CloseNotify())
return epochWatcher == nil
}() {
return
}
}
}
}
// pauseFlusherToPileupMemParts takes a pause to wait for in-memory parts to pile up.
// If there is no in-memory part, we can skip the pause.
// When a merging is finished, we can skip the pause.
func (tst *tsTable) pauseFlusherToPileupMemParts(epoch uint64, flushWatcher watcher.Channel, flusherWatchers watcher.Epochs) watcher.Epochs {
curSnapshot := tst.currentSnapshot()
if curSnapshot == nil {
return flusherWatchers
}
curSnapshot.decRef()
flusherWatchers.Notify(epoch)
select {
case <-tst.loopCloser.CloseNotify():
case <-time.After(tst.option.flushTimeout):
tst.incTotalFlushPauseCompleted(1)
case e := <-flushWatcher:
flusherWatchers.Add(e)
flusherWatchers.Notify(epoch)
tst.incTotalFlushPauseBreak(1)
}
return flusherWatchers
}
func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh chan *mergerIntroduction) (bool, error) {
var memParts []*partWrapper
mergedIDs := make(map[uint64]struct{})
for i := range snp.parts {
if snp.parts[i].mp != nil {
memParts = append(memParts, snp.parts[i])
mergedIDs[snp.parts[i].ID()] = struct{}{}
continue
}
}
if len(memParts) < 2 {
return false, nil
}
// merge memory must not be closed by the tsTable.close
closeCh := make(chan struct{})
newPart, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
mergedIDs, mergeCh, closeCh, "mem")
close(closeCh)
if err != nil {
if errors.Is(err, errClosed) {
return true, nil
}
return false, err
}
if newPart == nil {
return false, nil
}
return true, nil
}
func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) {
ind := generateFlusherIntroduction()
defer releaseFlusherIntroduction(ind)
start := time.Now()
partsCount := 0
for _, pw := range snapshot.parts {
if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
continue
}
partsCount++
partPath := partPath(tst.root, pw.ID())
pw.mp.mustFlush(tst.fileSystem, partPath)
newPW := newPartWrapper(nil, mustOpenFilePart(pw.ID(), tst.root, tst.fileSystem))
newPW.p.partMetadata.ID = pw.ID()
ind.flushed[newPW.ID()] = newPW
}
if len(ind.flushed) < 1 {
return
}
end := time.Now()
tst.incTotalFlushed(1)
tst.incTotalFlushedMemParts(partsCount)
tst.incTotalFlushLatency(end.Sub(start).Seconds())
ind.applied = make(chan struct{})
select {
case flushCh <- ind:
case <-tst.loopCloser.CloseNotify():
return
}
select {
case <-ind.applied:
case <-tst.loopCloser.CloseNotify():
}
tst.incTotalFlushIntroLatency(time.Since(end).Seconds())
}
func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
var partNames []string
for i := range snapshot.parts {
partNames = append(partNames, partName(snapshot.parts[i].ID()))
}
tst.mustWriteSnapshot(snapshot.epoch, partNames)
tst.gc.registerSnapshot(snapshot)
}