func()

in pkg/wal/repository.go [47:160]


func (s *Repository) Open(ctx context.Context) error {
	if s.opts.StorageDir == "" {
		return fmt.Errorf("storage dir is required")
	}

	stat, err := os.Stat(s.opts.StorageDir)
	if os.IsNotExist(err) {
		if err := os.MkdirAll(s.opts.StorageDir, 0755); err != nil {
			return err
		}
	} else if err != nil {
		return err
	} else if !stat.IsDir() {
		return fmt.Errorf("storage dir is not a directory")
	}

	dir, err := os.Open(s.opts.StorageDir)
	if err != nil {
		return err
	}

	for {
		entries, err := dir.ReadDir(100)
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}

		for _, d := range entries {
			path := filepath.Join(s.opts.StorageDir, d.Name())
			if d.IsDir() || filepath.Ext(path) != ".wal" {
				continue
			}

			// This block was added when we had an non-backwards compatible segment file change in the segment
			// file format.  To simplify the migration, we just remove any segment files that are not in the
			// expected format.  In the future, we will have a versioned segment file format to avoid this.
			if !IsSegment(path) {
				logger.Warnf("Segment file is not a WAL segment file: %s. Removing", path)
				if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
					logger.Warnf("Failed to remove invalid segment file: %s %s", path, err.Error())
				}
				continue
			}

			// Walk each segment on disk and Open it to trigger a repair if necessary.
			var seg Segment
			seg, err = Open(path)
			if err != nil {
				logger.Warnf("Failed to open segment file: %s %s", path, err.Error())
			} else if err := seg.Close(); err != nil {
				logger.Warnf("Failed to close segment file: %s %s", path, err.Error())
			}

			fileName := filepath.Base(path)
			database, table, schema, epoch, err := ParseFilename(fileName)
			if err != nil {
				continue
			}

			prefix := fmt.Sprintf("%s_%s", database, table)
			if schema != "" {
				prefix = fmt.Sprintf("%s_%s", prefix, schema)
			}

			createdAt, err := flakeutil.ParseFlakeID(epoch)
			if err != nil {
				logger.Warnf("Failed to parse flake id: %s %s", epoch, err.Error())
				continue
			}

			fi, err := d.Info()
			if err != nil {
				logger.Warnf("Failed to get file info: %s %s", path, err.Error())
				continue
			}

			// If the segment is only 8 bytes, that means only the segment magic header has been written and there
			// is no data in the file.  We don't want to upload these to kusto so they can be removed.
			if fi.Size() == 8 {
				if logger.IsDebug() {
					logger.Debugf("Removing empty segment: %s", path)
				}
				if err := os.Remove(path); err != nil {
					logger.Warnf("Failed to remove empty segment: %s %s", path, err.Error())
				}
				continue
			}

			info := SegmentInfo{
				Prefix:    prefix,
				Ulid:      epoch,
				Path:      path,
				Size:      fi.Size(),
				CreatedAt: createdAt,
			}
			s.index.Add(info)

			_, ok := s.wals.Get(prefix)
			if ok {
				continue
			}

			wal, err := s.newWAL(ctx, prefix)
			if err != nil {
				return err
			}
			s.wals.Set(prefix, wal)
		}
	}

	return nil
}