banyand/measure/merger.go (359 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/dustin/go-humanize"
"github.com/apache/skywalking-banyandb/pkg/cgroups"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
var mergeMaxConcurrencyCh = make(chan struct{}, cgroups.CPUs())
func (tst *tsTable) mergeLoop(merges chan *mergerIntroduction, flusherNotifier watcher.Channel) {
defer tst.loopCloser.Done()
var epoch uint64
ew := flusherNotifier.Add(0, tst.loopCloser.CloseNotify())
if ew == nil {
return
}
var pwsChunk []*partWrapper
for {
select {
case <-tst.loopCloser.CloseNotify():
return
case <-ew.Watch():
if func() bool {
curSnapshot := tst.currentSnapshot()
if curSnapshot == nil {
return false
}
defer curSnapshot.decRef()
if curSnapshot.epoch != epoch {
select {
case mergeMaxConcurrencyCh <- struct{}{}:
defer func() {
<-mergeMaxConcurrencyCh
}()
case <-tst.loopCloser.CloseNotify():
return true
}
tst.incTotalMergeLoopStarted(1)
defer tst.incTotalMergeLoopFinished(1)
var err error
if pwsChunk, err = tst.mergeSnapshot(curSnapshot, merges, pwsChunk[:0]); err != nil {
if errors.Is(err, errClosed) {
return true
}
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", curSnapshot.epoch)
tst.incTotalMergeLoopErr(1)
return false
}
epoch = curSnapshot.epoch
}
ew = flusherNotifier.Add(epoch, tst.loopCloser.CloseNotify())
return ew == nil
}() {
return
}
}
}
}
func (tst *tsTable) mergeSnapshot(curSnapshot *snapshot, merges chan *mergerIntroduction, dst []*partWrapper) ([]*partWrapper, error) {
freeDiskSize := tst.freeDiskSpace(tst.root)
var toBeMerged map[uint64]struct{}
dst, toBeMerged = tst.getPartsToMerge(curSnapshot, freeDiskSize, dst)
if len(dst) < 2 {
return nil, nil
}
if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, dst,
toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err != nil {
return dst, err
}
return dst, nil
}
func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, parts []*partWrapper, merged map[uint64]struct{}, merges chan *mergerIntroduction,
closeCh <-chan struct{}, typ string,
) (*partWrapper, error) {
reservedSpace := tst.reserveSpace(parts)
defer releaseDiskSpace(reservedSpace)
start := time.Now()
newPart, err := mergeParts(tst.fileSystem, closeCh, parts, atomic.AddUint64(&tst.curPartID, 1), tst.root)
if err != nil {
return nil, err
}
elapsed := time.Since(start)
tst.incTotalMergeLatency(elapsed.Seconds(), typ)
tst.incTotalMerged(1, typ)
tst.incTotalMergedParts(len(parts), typ)
if elapsed > 30*time.Second {
var totalCount uint64
for _, pw := range parts {
totalCount += pw.p.partMetadata.TotalCount
}
tst.l.Warn().
Uint64("beforeTotalCount", totalCount).
Uint64("afterTotalCount", newPart.p.partMetadata.TotalCount).
Int("beforePartCount", len(parts)).
Dur("elapsed", elapsed).
Msg("background merger takes too long")
} else if snapshotCreatorMerger == creator && tst.l.Info().Enabled() && len(parts) > 2 {
var minSize, maxSize, totalSize, totalCount uint64
for _, pw := range parts {
totalCount += pw.p.partMetadata.TotalCount
totalSize += pw.p.partMetadata.CompressedSizeBytes
if minSize == 0 || minSize > pw.p.partMetadata.CompressedSizeBytes {
minSize = pw.p.partMetadata.CompressedSizeBytes
}
if maxSize < pw.p.partMetadata.CompressedSizeBytes {
maxSize = pw.p.partMetadata.CompressedSizeBytes
}
}
if totalSize > 10<<20 && minSize*uint64(len(parts)) < maxSize {
// it's a unbalanced merge. but it's ok when the size is small.
tst.l.Info().
Str("beforeTotalCount", humanize.Comma(int64(totalCount))).
Str("afterTotalCount", humanize.Comma(int64(newPart.p.partMetadata.TotalCount))).
Int("beforePartCount", len(parts)).
Str("minSize", humanize.IBytes(minSize)).
Str("maxSize", humanize.IBytes(maxSize)).
Dur("elapsedMS", elapsed).
Msg("background merger merges unbalanced parts")
}
}
mi := generateMergerIntroduction()
defer releaseMergerIntroduction(mi)
mi.creator = creator
mi.newPart = newPart
mi.merged = merged
mi.applied = make(chan struct{})
select {
case merges <- mi:
case <-tst.loopCloser.CloseNotify():
return newPart, errClosed
}
select {
case <-mi.applied:
case <-tst.loopCloser.CloseNotify():
return newPart, errClosed
}
return newPart, nil
}
func (tst *tsTable) freeDiskSpace(path string) uint64 {
free := tst.fileSystem.MustGetFreeSpace(path)
reserved := atomic.LoadUint64(&reservedDiskSpace)
if free < reserved {
return 0
}
return free - reserved
}
func (tst *tsTable) tryReserveDiskSpace(n uint64) bool {
available := tst.fileSystem.MustGetFreeSpace(tst.root)
reserved := reserveDiskSpace(n)
if available > reserved {
return true
}
releaseDiskSpace(n)
return false
}
func reserveDiskSpace(n uint64) uint64 {
return atomic.AddUint64(&reservedDiskSpace, n)
}
func releaseDiskSpace(n uint64) {
atomic.AddUint64(&reservedDiskSpace, ^(n - 1))
}
var reservedDiskSpace uint64
func (tst *tsTable) getPartsToMerge(snapshot *snapshot, freeDiskSize uint64, dst []*partWrapper) ([]*partWrapper, map[uint64]struct{}) {
var parts []*partWrapper
for _, pw := range snapshot.parts {
if pw.mp != nil || pw.p.partMetadata.TotalCount < 1 {
continue
}
parts = append(parts, pw)
}
dst = tst.option.mergePolicy.getPartsToMerge(dst, parts, freeDiskSize)
if len(dst) == 0 {
return nil, nil
}
toBeMerged := make(map[uint64]struct{})
for _, pw := range dst {
toBeMerged[pw.ID()] = struct{}{}
}
return dst, toBeMerged
}
func (tst *tsTable) reserveSpace(parts []*partWrapper) uint64 {
var needSize uint64
for i := range parts {
needSize = +parts[i].p.partMetadata.CompressedSizeBytes
}
if tst.tryReserveDiskSpace(needSize) {
return needSize
}
return 0
}
var errNoPartToMerge = fmt.Errorf("no part to merge")
func mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, error) {
if len(parts) == 0 {
return nil, errNoPartToMerge
}
dstPath := partPath(root, partID)
pii := make([]*partMergeIter, 0, len(parts))
for i := range parts {
pmi := generatePartMergeIter()
pmi.mustInitFromPart(parts[i].p)
pii = append(pii, pmi)
}
br := generateBlockReader()
br.init(pii)
bw := generateBlockWriter()
bw.mustInitForFilePart(fileSystem, dstPath)
pm, err := mergeBlocks(closeCh, bw, br)
releaseBlockWriter(bw)
releaseBlockReader(br)
for i := range pii {
releasePartMergeIter(pii[i])
}
if err != nil {
return nil, err
}
pm.mustWriteMetadata(fileSystem, dstPath)
fileSystem.SyncPath(dstPath)
p := mustOpenFilePart(partID, root, fileSystem)
return newPartWrapper(nil, p), nil
}
var errClosed = fmt.Errorf("the merger is closed")
func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) (*partMetadata, error) {
pendingBlockIsEmpty := true
pendingBlock := generateBlockPointer()
defer releaseBlockPointer(pendingBlock)
var tmpBlock, tmpBlock2 *blockPointer
var decoder *encoding.BytesBlockDecoder
getDecoder := func() *encoding.BytesBlockDecoder {
if decoder == nil {
decoder = generateColumnValuesDecoder()
}
return decoder
}
releaseDecoder := func() {
if decoder != nil {
releaseColumnValuesDecoder(decoder)
decoder = nil
}
}
for br.nextBlockMetadata() {
select {
case <-closeCh:
return nil, errClosed
default:
}
b := br.block
if pendingBlockIsEmpty {
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
pendingBlockIsEmpty = false
continue
}
if pendingBlock.bm.seriesID != b.bm.seriesID ||
(pendingBlock.isFull() && pendingBlock.bm.timestamps.max <= b.bm.timestamps.min) {
bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
releaseDecoder()
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
continue
}
if tmpBlock == nil {
tmpBlock = generateBlockPointer()
defer releaseBlockPointer(tmpBlock)
}
tmpBlock.reset()
tmpBlock.bm.seriesID = b.bm.seriesID
br.loadBlockData(getDecoder())
mergeTwoBlocks(tmpBlock, pendingBlock, b)
if len(tmpBlock.timestamps) <= maxBlockLength && tmpBlock.uncompressedSizeBytes() <= maxUncompressedBlockSize {
if len(tmpBlock.timestamps) == 0 {
pendingBlockIsEmpty = true
}
pendingBlock, tmpBlock = tmpBlock, pendingBlock
continue
}
if len(tmpBlock.timestamps) <= maxBlockLength {
bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock.block)
pendingBlock.reset()
pendingBlockIsEmpty = true
releaseDecoder()
continue
}
tmpBlock.idx = maxBlockLength
pendingBlock.copyFrom(tmpBlock)
l := tmpBlock.idx
tmpBlock.idx = 0
if tmpBlock2 == nil {
tmpBlock2 = generateBlockPointer()
defer releaseBlockPointer(tmpBlock2)
}
tmpBlock2.reset()
tmpBlock2.append(tmpBlock, l)
bw.mustWriteBlock(tmpBlock.bm.seriesID, &tmpBlock2.block)
releaseDecoder()
}
if err := br.error(); err != nil {
return nil, fmt.Errorf("cannot read block to merge: %w", err)
}
if !pendingBlockIsEmpty {
bw.mustWriteBlock(pendingBlock.bm.seriesID, &pendingBlock.block)
}
releaseDecoder()
var result partMetadata
bw.Flush(&result)
return &result, nil
}
func mergeTwoBlocks(target, left, right *blockPointer) {
appendIfEmpty := func(ib1, ib2 *blockPointer) bool {
if ib1.idx >= len(ib1.timestamps) {
target.appendAll(ib2)
return true
}
return false
}
defer target.updateMetadata()
if left.bm.timestamps.max < right.bm.timestamps.min {
target.appendAll(left)
target.appendAll(right)
return
}
if right.bm.timestamps.max < left.bm.timestamps.min {
target.appendAll(right)
target.appendAll(left)
return
}
if appendIfEmpty(left, right) || appendIfEmpty(right, left) {
return
}
for {
i := left.idx
ts2 := right.timestamps[right.idx]
for i < len(left.timestamps) && left.timestamps[i] <= ts2 {
i++
}
if left.timestamps[i-1] == ts2 {
if left.versions[i-1] >= right.versions[right.idx] {
target.append(left, i)
} else {
target.append(left, i-1) // skip left
target.append(right, right.idx+1)
}
left.idx = i
right.idx++ // skip right
if appendIfEmpty(right, left) {
return
}
} else {
target.append(left, i)
left.idx = i
}
if appendIfEmpty(left, right) {
return
}
left, right = right, left
}
}