in banyand/tsdb/shard.go [85:156]
func OpenShard(ctx context.Context, id common.ShardID,
root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int,
) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
}
l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
if openedBlockSize < 1 {
openedBlockSize = defaultBlockQueueSize
}
shardCtx := context.WithValue(ctx, logger.ContextKey, l)
shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
p.Shard = strconv.Itoa(int(id))
return p
})
clock, _ := timestamp.GetClock(shardCtx)
scheduler := timestamp.NewScheduler(l, clock)
sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l, scheduler)
if err != nil {
return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
}
s := &shard{
id: id,
segmentController: sc,
l: l,
scheduler: scheduler,
}
err = s.segmentController.open()
if err != nil {
return nil, err
}
seriesPath, err := mkdir(seriesTemplate, path)
if err != nil {
return nil, err
}
sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
if err != nil {
return nil, err
}
s.seriesDatabase = sdb
idb := newIndexDatabase(shardCtx, s.id, s.segmentController)
s.indexDatabase = idb
s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
}
s.segmentManageStrategy.Run()
s.position = common.GetPosition(shardCtx)
retentionTask := newRetentionTask(s.segmentController, ttl)
if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil {
return nil, err
}
plv := s.position.ShardLabelValues()
observability.MetricsCollector.Register(strings.Join(plv, "-"), func() {
if stat, err := diskStateGetter(ctx, path); err != nil {
s.l.Error().Err(err).Msg("get disk usage stat")
} else {
diskStateGauge.Set(stat.UsedPercent, append(plv, "used_percent")...)
diskStateGauge.Set(float64(stat.Free), append(plv, "free")...)
diskStateGauge.Set(float64(stat.Total), append(plv, "total")...)
diskStateGauge.Set(float64(stat.Used), append(plv, "used")...)
diskStateGauge.Set(float64(stat.InodesUsed), append(plv, "inodes_used")...)
diskStateGauge.Set(float64(stat.InodesFree), append(plv, "inodes_free")...)
diskStateGauge.Set(float64(stat.InodesTotal), append(plv, "inodes_total")...)
diskStateGauge.Set(stat.InodesUsedPercent, append(plv, "inodes_used_percent")...)
}
s.collectSizeOnDisk(plv)
})
return s, nil
}