func OpenShard()

in banyand/tsdb/shard.go [85:156]


func OpenShard(ctx context.Context, id common.ShardID,
	root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int,
) (Shard, error) {
	path, err := mkdir(shardTemplate, root, int(id))
	if err != nil {
		return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id))
	}
	l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
	l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard")
	if openedBlockSize < 1 {
		openedBlockSize = defaultBlockQueueSize
	}
	shardCtx := context.WithValue(ctx, logger.ContextKey, l)
	shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
		p.Shard = strconv.Itoa(int(id))
		return p
	})
	clock, _ := timestamp.GetClock(shardCtx)
	scheduler := timestamp.NewScheduler(l, clock)
	sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l, scheduler)
	if err != nil {
		return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
	}
	s := &shard{
		id:                id,
		segmentController: sc,
		l:                 l,
		scheduler:         scheduler,
	}
	err = s.segmentController.open()
	if err != nil {
		return nil, err
	}
	seriesPath, err := mkdir(seriesTemplate, path)
	if err != nil {
		return nil, err
	}
	sdb, err := newSeriesDataBase(shardCtx, s.id, seriesPath, s.segmentController)
	if err != nil {
		return nil, err
	}
	s.seriesDatabase = sdb
	idb := newIndexDatabase(shardCtx, s.id, s.segmentController)
	s.indexDatabase = idb
	s.segmentManageStrategy, err = bucket.NewStrategy(s.segmentController, bucket.WithLogger(s.l))
	if err != nil {
		return nil, err
	}
	s.segmentManageStrategy.Run()
	s.position = common.GetPosition(shardCtx)
	retentionTask := newRetentionTask(s.segmentController, ttl)
	if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil {
		return nil, err
	}
	plv := s.position.ShardLabelValues()
	observability.MetricsCollector.Register(strings.Join(plv, "-"), func() {
		if stat, err := diskStateGetter(ctx, path); err != nil {
			s.l.Error().Err(err).Msg("get disk usage stat")
		} else {
			diskStateGauge.Set(stat.UsedPercent, append(plv, "used_percent")...)
			diskStateGauge.Set(float64(stat.Free), append(plv, "free")...)
			diskStateGauge.Set(float64(stat.Total), append(plv, "total")...)
			diskStateGauge.Set(float64(stat.Used), append(plv, "used")...)
			diskStateGauge.Set(float64(stat.InodesUsed), append(plv, "inodes_used")...)
			diskStateGauge.Set(float64(stat.InodesFree), append(plv, "inodes_free")...)
			diskStateGauge.Set(float64(stat.InodesTotal), append(plv, "inodes_total")...)
			diskStateGauge.Set(stat.InodesUsedPercent, append(plv, "inodes_used_percent")...)
		}
		s.collectSizeOnDisk(plv)
	})
	return s, nil
}