in banyand/tsdb/segment.go [51:108]
func openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string,
segmentSize, blockSize IntervalRule, blockQueue bucket.Queue, scheduler *timestamp.Scheduler,
) (s *segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
id := GenerateInternalID(segmentSize.Unit, suffixInteger)
timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
s = &segment{
id: id,
path: path,
suffix: suffix,
TimeRange: timeRange,
position: common.GetPosition(ctx),
}
l := logger.Fetch(ctx, s.String())
s.l = l
segCtx := context.WithValue(ctx, logger.ContextKey, l)
clock, segCtx := timestamp.GetClock(segCtx)
s.blockController = newBlockController(segCtx, id, suffix, path, timeRange, blockSize, l, blockQueue, scheduler)
s.Reporter = bucket.NewTimeBasedReporter(s.String(), timeRange, clock, scheduler)
err = s.blockController.open()
if err != nil {
return nil, err
}
indexPath, err := mkdir(globalIndexTemplate, path)
if err != nil {
return nil, err
}
o := ctx.Value(OptionsKey)
if o != nil {
options := o.(DatabaseOpts)
if options.EnableGlobalIndex {
memSize := options.GlobalIndexMemSize
if memSize == 0 {
memSize = defaultKVMemorySize
}
if s.globalIndex, err = kv.OpenStore(
indexPath,
kv.StoreWithLogger(s.l),
kv.StoreWithMemTableSize(int64(memSize)),
); err != nil {
return nil, err
}
}
}
if !s.End.After(clock.Now()) {
return
}
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l))
if err != nil {
return nil, err
}
s.blockManageStrategy.Run()
return s, nil
}