func newTSTable()

in banyand/stream/tstable.go [169:247]


func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
	l *logger.Logger, _ timestamp.TimeRange, option option, m any,
) (*tsTable, error) {
	tst := tsTable{
		fileSystem: fileSystem,
		root:       rootPath,
		option:     option,
		l:          l,
		p:          p,
	}
	var indexMetrics *inverted.Metrics
	if m != nil {
		tst.metrics = m.(*metrics)
		indexMetrics = tst.metrics.indexMetrics
	}
	index, err := newElementIndex(context.TODO(), rootPath, option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second), indexMetrics)
	if err != nil {
		return nil, err
	}
	tst.index = index
	tst.gc.init(&tst)
	ee := fileSystem.ReadDir(rootPath)
	if len(ee) == 0 {
		t := &tst
		t.startLoop(uint64(time.Now().UnixNano()))
		return t, nil
	}
	var loadedParts []uint64
	var loadedSnapshots []uint64
	var needToDelete []string
	for i := range ee {
		if ee[i].IsDir() {
			if ee[i].Name() == elementIndexFilename {
				continue
			}
			p, err := parseEpoch(ee[i].Name())
			if err != nil {
				l.Info().Err(err).Msg("cannot parse part file name. skip and delete it")
				needToDelete = append(needToDelete, ee[i].Name())
				continue
			}
			err = validatePartMetadata(fileSystem, filepath.Join(rootPath, ee[i].Name()))
			if err != nil {
				l.Info().Err(err).Msg("cannot validate part metadata. skip and delete it")
				needToDelete = append(needToDelete, ee[i].Name())
				continue
			}
			loadedParts = append(loadedParts, p)
			continue
		}
		if filepath.Ext(ee[i].Name()) != snapshotSuffix {
			continue
		}
		snapshot, err := parseSnapshot(ee[i].Name())
		if err != nil {
			l.Info().Err(err).Msg("cannot parse snapshot file name. skip and delete it")
			needToDelete = append(needToDelete, ee[i].Name())
			continue
		}
		loadedSnapshots = append(loadedSnapshots, snapshot)
	}
	for i := range needToDelete {
		l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file")
		fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
	}
	if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
		t := &tst
		t.startLoop(uint64(time.Now().UnixNano()))
		return t, nil
	}
	sort.Slice(loadedSnapshots, func(i, j int) bool {
		return loadedSnapshots[i] > loadedSnapshots[j]
	})
	epoch := loadedSnapshots[0]
	t := &tst
	t.loadSnapshot(epoch, loadedParts)
	t.startLoop(epoch)
	return t, nil
}