banyand/backup/restore.go (189 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package backup import ( "context" "errors" "fmt" "io" "os" "path/filepath" "strings" "github.com/spf13/cobra" "go.uber.org/multierr" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/backup/snapshot" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/fs/remote" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/version" ) // NewRestoreCommand creates a new restore command. func NewRestoreCommand() *cobra.Command { rootCmd := &cobra.Command{ DisableAutoGenTag: true, Version: version.Build(), Short: "Restore BanyanDB data from remote storage", PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { return config.Load("logging", cmd.Flags()) }, } rootCmd.AddCommand(newRunCommand()) rootCmd.AddCommand(NewTimeDirCommand()) return rootCmd } func newRunCommand() *cobra.Command { var ( source string streamRoot string measureRoot string propertyRoot string fsConfig remote.FsConfig ) cmd := &cobra.Command{ Use: "run", Short: "Restore BanyanDB data from remote storage", RunE: func(_ *cobra.Command, _ []string) error { if streamRoot == "" && measureRoot == "" && propertyRoot == "" { return errors.New("at least one of stream-root-path, measure-root-path, or property-root-path is required") } if source == "" { return errors.New("source is required") } fs, err := newFS(source, &fsConfig) if err != nil { return err } defer fs.Close() var errs error if streamRoot != "" { timeDirPath := filepath.Join(streamRoot, "stream", "time-dir") if data, err := os.ReadFile(timeDirPath); err == nil { timeDir := strings.TrimSpace(string(data)) if err = restoreCatalog(fs, timeDir, streamRoot, commonv1.Catalog_CATALOG_STREAM); err != nil { errs = multierr.Append(errs, fmt.Errorf("stream restore failed: %w", err)) } else { _ = os.Remove(timeDirPath) } } else if !errors.Is(err, os.ErrNotExist) { return err } } if measureRoot != "" { timeDirPath := filepath.Join(measureRoot, "measure", "time-dir") if data, err := os.ReadFile(timeDirPath); err == nil { timeDir := strings.TrimSpace(string(data)) if err = restoreCatalog(fs, timeDir, measureRoot, commonv1.Catalog_CATALOG_MEASURE); err != nil { errs = multierr.Append(errs, fmt.Errorf("measure restore failed: %w", err)) } else { _ = os.Remove(timeDirPath) } } else if !errors.Is(err, os.ErrNotExist) { return err } } if propertyRoot != "" { timeDirPath := filepath.Join(propertyRoot, "property", "time-dir") if data, err := os.ReadFile(timeDirPath); err == nil { timeDir := strings.TrimSpace(string(data)) if err = restoreCatalog(fs, timeDir, propertyRoot, commonv1.Catalog_CATALOG_PROPERTY); err != nil { errs = multierr.Append(errs, fmt.Errorf("property restore failed: %w", err)) } else { _ = os.Remove(timeDirPath) } } else if !errors.Is(err, os.ErrNotExist) { return err } } return errs }, } cmd.Flags().StringVar(&source, "source", "", "Source URL (e.g., file:///backups)") cmd.Flags().StringVar(&streamRoot, "stream-root-path", "/tmp", "Root directory for stream catalog") cmd.Flags().StringVar(&measureRoot, "measure-root-path", "/tmp", "Root directory for measure catalog") cmd.Flags().StringVar(&propertyRoot, "property-root-path", "/tmp", "Root directory for property catalog") cmd.Flags().StringVar(&fsConfig.S3ConfigFilePath, "s3-config-file", "", "Path to the s3 configuration file") cmd.Flags().StringVar(&fsConfig.S3CredentialFilePath, "s3-credential-file", "", "Path to the s3 credential file") cmd.Flags().StringVar(&fsConfig.S3ProfileName, "s3-profile", "", "S3 profile name") return cmd } func restoreCatalog(fs remote.FS, timeDir, rootPath string, catalog commonv1.Catalog) error { catalogName := snapshot.CatalogName(catalog) remotePrefix := filepath.Join(timeDir, catalogName, "/") remoteFiles, err := fs.List(context.Background(), remotePrefix) if err != nil { return fmt.Errorf("failed to list remote files: %w", err) } localDir := filepath.Join(snapshot.LocalDir(rootPath, catalog), storage.DataDir) if err = os.MkdirAll(localDir, storage.DirPerm); err != nil { return fmt.Errorf("failed to create local directory %s: %w", localDir, err) } logger.Infof("Restoring %s to %s from %s", catalogName, localDir, remotePrefix) remoteRelSet := make(map[string]bool) var relPath string for _, remoteFile := range remoteFiles { relPath, err = filepath.Rel(timeDir, remoteFile) if err != nil { return fmt.Errorf("failed to get relative path for %s: %w", remoteFile, err) } remoteRelSet[filepath.ToSlash(relPath)] = true } localFiles, err := getAllFiles(localDir) if err != nil { return fmt.Errorf("failed to list local files: %w", err) } for _, localRelPath := range localFiles { if !remoteRelSet[localRelPath] { localPath := filepath.Join(localDir, localRelPath) if err := os.Remove(localPath); err != nil { return fmt.Errorf("failed to remove local file %s: %w", localPath, err) } cleanEmptyDirs(filepath.Dir(localPath), localDir) } } for _, remoteFile := range remoteFiles { relPath, err := filepath.Rel(filepath.Join(timeDir, catalogName), remoteFile) if err != nil { return fmt.Errorf("failed to get relative path for %s: %w", remoteFile, err) } relPath = filepath.ToSlash(relPath) localPath := filepath.Join(rootPath, catalogName, storage.DataDir, relPath) if !contains(localFiles, relPath) { if err := os.MkdirAll(filepath.Dir(localPath), storage.DirPerm); err != nil { return fmt.Errorf("failed to create directory for %s: %w", localPath, err) } if err := downloadFile(context.Background(), fs, remoteFile, localPath); err != nil { return fmt.Errorf("failed to download %s: %w", remoteFile, err) } logger.Infof("Downloaded %s to %s", remoteFile, localPath) } } return nil } func cleanEmptyDirs(dir, stopDir string) { for { if dir == stopDir || dir == "." { break } entries, err := os.ReadDir(dir) if err != nil || len(entries) > 0 { break } _ = os.Remove(dir) dir = filepath.Dir(dir) } } func downloadFile(ctx context.Context, fs remote.FS, remotePath, localPath string) error { reader, err := fs.Download(ctx, remotePath) if err != nil { return err } defer reader.Close() file, err := os.Create(localPath) if err != nil { return err } defer file.Close() if _, err := io.Copy(file, reader); err != nil { return err } return nil }