func openSegment()

in banyand/tsdb/segment.go [51:108]


func openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string,
	segmentSize, blockSize IntervalRule, blockQueue bucket.Queue, scheduler *timestamp.Scheduler,
) (s *segment, err error) {
	suffixInteger, err := strconv.Atoi(suffix)
	if err != nil {
		return nil, err
	}
	id := GenerateInternalID(segmentSize.Unit, suffixInteger)
	timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
	s = &segment{
		id:        id,
		path:      path,
		suffix:    suffix,
		TimeRange: timeRange,
		position:  common.GetPosition(ctx),
	}
	l := logger.Fetch(ctx, s.String())
	s.l = l
	segCtx := context.WithValue(ctx, logger.ContextKey, l)
	clock, segCtx := timestamp.GetClock(segCtx)
	s.blockController = newBlockController(segCtx, id, suffix, path, timeRange, blockSize, l, blockQueue, scheduler)
	s.Reporter = bucket.NewTimeBasedReporter(s.String(), timeRange, clock, scheduler)
	err = s.blockController.open()
	if err != nil {
		return nil, err
	}

	indexPath, err := mkdir(globalIndexTemplate, path)
	if err != nil {
		return nil, err
	}
	o := ctx.Value(OptionsKey)
	if o != nil {
		options := o.(DatabaseOpts)
		if options.EnableGlobalIndex {
			memSize := options.GlobalIndexMemSize
			if memSize == 0 {
				memSize = defaultKVMemorySize
			}
			if s.globalIndex, err = kv.OpenStore(
				indexPath,
				kv.StoreWithLogger(s.l),
				kv.StoreWithMemTableSize(int64(memSize)),
			); err != nil {
				return nil, err
			}
		}
	}
	if !s.End.After(clock.Now()) {
		return
	}
	s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
	if err != nil {
		return nil, err
	}
	s.blockManageStrategy.Run()
	return s, nil
}