in banyand/stream/tstable.go [169:247]
func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
l *logger.Logger, _ timestamp.TimeRange, option option, m any,
) (*tsTable, error) {
tst := tsTable{
fileSystem: fileSystem,
root: rootPath,
option: option,
l: l,
p: p,
}
var indexMetrics *inverted.Metrics
if m != nil {
tst.metrics = m.(*metrics)
indexMetrics = tst.metrics.indexMetrics
}
index, err := newElementIndex(context.TODO(), rootPath, option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second), indexMetrics)
if err != nil {
return nil, err
}
tst.index = index
tst.gc.init(&tst)
ee := fileSystem.ReadDir(rootPath)
if len(ee) == 0 {
t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
var loadedParts []uint64
var loadedSnapshots []uint64
var needToDelete []string
for i := range ee {
if ee[i].IsDir() {
if ee[i].Name() == elementIndexFilename {
continue
}
p, err := parseEpoch(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse part file name. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
err = validatePartMetadata(fileSystem, filepath.Join(rootPath, ee[i].Name()))
if err != nil {
l.Info().Err(err).Msg("cannot validate part metadata. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
loadedParts = append(loadedParts, p)
continue
}
if filepath.Ext(ee[i].Name()) != snapshotSuffix {
continue
}
snapshot, err := parseSnapshot(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse snapshot file name. skip and delete it")
needToDelete = append(needToDelete, ee[i].Name())
continue
}
loadedSnapshots = append(loadedSnapshots, snapshot)
}
for i := range needToDelete {
l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file")
fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i]))
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
sort.Slice(loadedSnapshots, func(i, j int) bool {
return loadedSnapshots[i] > loadedSnapshots[j]
})
epoch := loadedSnapshots[0]
t := &tst
t.loadSnapshot(epoch, loadedParts)
t.startLoop(epoch)
return t, nil
}