in pkg/wal/repository.go [47:160]
func (s *Repository) Open(ctx context.Context) error {
if s.opts.StorageDir == "" {
return fmt.Errorf("storage dir is required")
}
stat, err := os.Stat(s.opts.StorageDir)
if os.IsNotExist(err) {
if err := os.MkdirAll(s.opts.StorageDir, 0755); err != nil {
return err
}
} else if err != nil {
return err
} else if !stat.IsDir() {
return fmt.Errorf("storage dir is not a directory")
}
dir, err := os.Open(s.opts.StorageDir)
if err != nil {
return err
}
for {
entries, err := dir.ReadDir(100)
if err == io.EOF {
break
} else if err != nil {
return err
}
for _, d := range entries {
path := filepath.Join(s.opts.StorageDir, d.Name())
if d.IsDir() || filepath.Ext(path) != ".wal" {
continue
}
// This block was added when we had an non-backwards compatible segment file change in the segment
// file format. To simplify the migration, we just remove any segment files that are not in the
// expected format. In the future, we will have a versioned segment file format to avoid this.
if !IsSegment(path) {
logger.Warnf("Segment file is not a WAL segment file: %s. Removing", path)
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
logger.Warnf("Failed to remove invalid segment file: %s %s", path, err.Error())
}
continue
}
// Walk each segment on disk and Open it to trigger a repair if necessary.
var seg Segment
seg, err = Open(path)
if err != nil {
logger.Warnf("Failed to open segment file: %s %s", path, err.Error())
} else if err := seg.Close(); err != nil {
logger.Warnf("Failed to close segment file: %s %s", path, err.Error())
}
fileName := filepath.Base(path)
database, table, schema, epoch, err := ParseFilename(fileName)
if err != nil {
continue
}
prefix := fmt.Sprintf("%s_%s", database, table)
if schema != "" {
prefix = fmt.Sprintf("%s_%s", prefix, schema)
}
createdAt, err := flakeutil.ParseFlakeID(epoch)
if err != nil {
logger.Warnf("Failed to parse flake id: %s %s", epoch, err.Error())
continue
}
fi, err := d.Info()
if err != nil {
logger.Warnf("Failed to get file info: %s %s", path, err.Error())
continue
}
// If the segment is only 8 bytes, that means only the segment magic header has been written and there
// is no data in the file. We don't want to upload these to kusto so they can be removed.
if fi.Size() == 8 {
if logger.IsDebug() {
logger.Debugf("Removing empty segment: %s", path)
}
if err := os.Remove(path); err != nil {
logger.Warnf("Failed to remove empty segment: %s %s", path, err.Error())
}
continue
}
info := SegmentInfo{
Prefix: prefix,
Ulid: epoch,
Path: path,
Size: fi.Size(),
CreatedAt: createdAt,
}
s.index.Add(info)
_, ok := s.wals.Get(prefix)
if ok {
continue
}
wal, err := s.newWAL(ctx, prefix)
if err != nil {
return err
}
s.wals.Set(prefix, wal)
}
}
return nil
}