banyand/tsdb/shard.go (278 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"context"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const (
defaultBlockQueueSize = 2
defaultMaxBlockQueueSize = 64
defaultKVMemorySize = 4 << 20
)
var (
_ Shard = (*shard)(nil)
shardProvider = observability.NewMeterProvider(meterTSDB.SubScope("shard"))
diskStateGetter = disk.UsageWithContext
diskStateGauge meter.Gauge
flushBytes meter.Counter
flushNum meter.Counter
flushLatency meter.Histogram
receivedBytesCounter meter.Counter
receivedNumCounter meter.Counter
onDiskBytesGauge meter.Gauge
)
func init() {
labelNames := common.ShardLabelNames()
diskStateGauge = shardProvider.Gauge("disk_state", append(labelNames, "kind")...)
flushBytes = shardProvider.Counter("flush_bytes", labelNames...)
flushNum = shardProvider.Counter("flush_num", append(labelNames, "is_error")...)
flushLatency = shardProvider.Histogram("flush_latency", meter.DefBuckets, labelNames...)
receivedBytesCounter = shardProvider.Counter("received_bytes", append(labelNames, "kind")...)
receivedNumCounter = shardProvider.Counter("received_num", append(labelNames, "kind", "is_error")...)
onDiskBytesGauge = shardProvider.Gauge("on_disk_bytes", append(labelNames, "kind")...)
}
type shard struct {
seriesDatabase SeriesDatabase
indexDatabase IndexDatabase
l *logger.Logger
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
scheduler *timestamp.Scheduler
position common.Position
closeOnce sync.Once
id common.ShardID
}
// OpenShard returns an existed Shard or create a new one if not existed.
func OpenShard(ctx context.Context, id common.ShardID,
root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int,
) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
}
l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
if openedBlockSize < 1 {
openedBlockSize = defaultBlockQueueSize
}
shardCtx := context.WithValue(ctx, logger.ContextKey, l)
shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
p.Shard = strconv.Itoa(int(id))
return p
})
clock, _ := timestamp.GetClock(shardCtx)
scheduler := timestamp.NewScheduler(l, clock)
sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l, scheduler)
if err != nil {
return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
}
s := &shard{
id: id,
segmentController: sc,
l: l,
scheduler: scheduler,
}
err = s.segmentController.open()
if err != nil {
return nil, err
}
seriesPath, err := mkdir(seriesTemplate, path)
if err != nil {
return nil, err
}
sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
if err != nil {
return nil, err
}
s.seriesDatabase = sdb
idb := newIndexDatabase(shardCtx, s.id, s.segmentController)
s.indexDatabase = idb
s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
}
s.segmentManageStrategy.Run()
s.position = common.GetPosition(shardCtx)
retentionTask := newRetentionTask(s.segmentController, ttl)
if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil {
return nil, err
}
plv := s.position.ShardLabelValues()
observability.MetricsCollector.Register(strings.Join(plv, "-"), func() {
if stat, err := diskStateGetter(ctx, path); err != nil {
s.l.Error().Err(err).Msg("get disk usage stat")
} else {
diskStateGauge.Set(stat.UsedPercent, append(plv, "used_percent")...)
diskStateGauge.Set(float64(stat.Free), append(plv, "free")...)
diskStateGauge.Set(float64(stat.Total), append(plv, "total")...)
diskStateGauge.Set(float64(stat.Used), append(plv, "used")...)
diskStateGauge.Set(float64(stat.InodesUsed), append(plv, "inodes_used")...)
diskStateGauge.Set(float64(stat.InodesFree), append(plv, "inodes_free")...)
diskStateGauge.Set(float64(stat.InodesTotal), append(plv, "inodes_total")...)
diskStateGauge.Set(stat.InodesUsedPercent, append(plv, "inodes_used_percent")...)
}
s.collectSizeOnDisk(plv)
})
return s, nil
}
func (s *shard) ID() common.ShardID {
return s.id
}
func (s *shard) Series() SeriesDatabase {
return s.seriesDatabase
}
func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
func (s *shard) collectSizeOnDisk(labelsValues []string) {
var globalIndex, localLSM, localInverted, tst int64
for _, seg := range s.segmentController.segments() {
if seg.globalIndex != nil {
globalIndex += seg.globalIndex.SizeOnDisk()
}
for _, b := range seg.blockController.blocks() {
if b.Closed() {
continue
}
if b.tsTable != nil {
tst += b.tsTable.SizeOnDisk()
}
if b.lsmIndex != nil {
localLSM += b.lsmIndex.SizeOnDisk()
}
if b.invertedIndex != nil {
localInverted += b.invertedIndex.SizeOnDisk()
}
}
}
onDiskBytesGauge.Set(float64(s.seriesDatabase.SizeOnDisk()), append(labelsValues, "series")...)
onDiskBytesGauge.Set(float64(globalIndex), append(labelsValues, "global_index")...)
onDiskBytesGauge.Set(float64(localLSM), append(labelsValues, "local_lsm")...)
onDiskBytesGauge.Set(float64(localInverted), append(labelsValues, "local_inverted")...)
onDiskBytesGauge.Set(float64(tst), append(labelsValues, "tst")...)
}
func (s *shard) State() (shardState ShardState) {
shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String())
for _, seg := range s.segmentController.segments() {
if seg.blockManageStrategy != nil {
shardState.StrategyManagers = append(shardState.StrategyManagers, seg.blockManageStrategy.String())
}
for _, b := range seg.blockController.blocks() {
shardState.Blocks = append(shardState.Blocks, BlockState{
ID: BlockID{
SegID: b.segID,
BlockID: b.blockID,
},
TimeRange: b.TimeRange,
Closed: b.Closed(),
})
}
}
all := s.segmentController.blockQueue.All()
shardState.OpenBlocks = make([]BlockID, len(all))
for i, v := range all {
shardState.OpenBlocks[i] = v.(BlockID)
}
sort.Slice(shardState.OpenBlocks, func(i, j int) bool {
x := shardState.OpenBlocks[i]
y := shardState.OpenBlocks[j]
if x.SegID == y.SegID {
return x.BlockID < y.BlockID
}
return x.SegID < y.SegID
})
s.l.Trace().Interface("", shardState).Msg("shard state")
return shardState
}
func (s *shard) TriggerSchedule(task string) bool {
return s.scheduler.Trigger(task)
}
func (s *shard) Close() (err error) {
s.closeOnce.Do(func() {
s.scheduler.Close()
s.segmentManageStrategy.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close())
})
return err
}
// IntervalUnit denotes the unit of a time point.
type IntervalUnit int
// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
const (
HOUR IntervalUnit = iota
DAY
)
func (iu IntervalUnit) String() string {
switch iu {
case HOUR:
return "hour"
case DAY:
return "day"
}
panic("invalid interval unit")
}
// IntervalRule defines a length of two points in time.
type IntervalRule struct {
Unit IntervalUnit
Num int
}
func (ir IntervalRule) nextTime(current time.Time) time.Time {
switch ir.Unit {
case HOUR:
return current.Add(time.Hour * time.Duration(ir.Num))
case DAY:
return current.AddDate(0, 0, ir.Num)
}
panic("invalid interval unit")
}
func (ir IntervalRule) estimatedDuration() time.Duration {
switch ir.Unit {
case HOUR:
return time.Hour * time.Duration(ir.Num)
case DAY:
return 24 * time.Hour * time.Duration(ir.Num)
}
panic("invalid interval unit")
}
type parser interface {
Parse(value string) (time.Time, error)
}
func loadSections(root, prefix string, parser parser, intervalRule IntervalRule, loadFn func(start, end time.Time) error) error {
var startTimeLst []time.Time
if err := walkDir(
root,
prefix,
func(suffix string) error {
startTime, err := parser.Parse(suffix)
if err != nil {
return err
}
startTimeLst = append(startTimeLst, startTime)
return nil
}); err != nil {
return err
}
sort.Slice(startTimeLst, func(i, j int) bool { return i < j })
for i, start := range startTimeLst {
var end time.Time
if i < len(startTimeLst)-1 {
end = startTimeLst[i+1]
} else {
end = intervalRule.nextTime(start)
}
if err := loadFn(start, end); err != nil {
return err
}
}
return nil
}