banyand/backup/backup.go (215 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package backup provides the backup command-line tool. package backup import ( "context" "errors" "fmt" "net/url" "os" "os/signal" "path" "path/filepath" "syscall" "time" "github.com/benbjohnson/clock" "github.com/robfig/cron/v3" "github.com/spf13/cobra" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/banyand/backup/snapshot" "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/fs/remote" "github.com/apache/skywalking-banyandb/pkg/fs/remote/aws" "github.com/apache/skywalking-banyandb/pkg/fs/remote/local" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/version" ) type backupOptions struct { fsConfig remote.FsConfig gRPCAddr string cert string timeStyle string schedule string streamRoot string measureRoot string propertyRoot string dest string enableTLS bool insecure bool } // NewBackupCommand creates a new backup command. func NewBackupCommand() *cobra.Command { var backupOpts backupOptions cmd := &cobra.Command{ Short: "Backup BanyanDB snapshots to remote storage", DisableAutoGenTag: true, Version: version.Build(), RunE: func(cmd *cobra.Command, _ []string) error { if err := config.Load("logging", cmd.Flags()); err != nil { return err } if backupOpts.schedule == "" { return backupAction(backupOpts) } schedLogger := logger.GetLogger().Named("backup-scheduler") schedLogger.Info().Msgf("backup to %s will run with schedule: %s", backupOpts.dest, backupOpts.schedule) clockInstance := clock.New() sch := timestamp.NewScheduler(schedLogger, clockInstance) err := sch.Register("backup", cron.Descriptor, backupOpts.schedule, func(_ time.Time, l *logger.Logger) bool { err := backupAction(backupOpts) if err != nil { l.Error().Err(err).Msg("backup failed") } else { l.Info().Msg("backup succeeded") } return true }) if err != nil { return err } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) schedLogger.Info().Msg("backup scheduler started, press Ctrl+C to exit") <-sigChan schedLogger.Info().Msg("shutting down backup scheduler...") sch.Close() return nil }, } cmd.Flags().StringVar(&backupOpts.gRPCAddr, "grpc-addr", "127.0.0.1:17912", "gRPC address of the data node") cmd.Flags().BoolVar(&backupOpts.enableTLS, "enable-tls", false, "Enable TLS for gRPC connection") cmd.Flags().BoolVar(&backupOpts.insecure, "insecure", false, "Skip server certificate verification") cmd.Flags().StringVar(&backupOpts.cert, "cert", "", "Path to the gRPC server certificate") cmd.Flags().StringVar(&backupOpts.streamRoot, "stream-root-path", "/tmp", "Root directory for stream catalog") cmd.Flags().StringVar(&backupOpts.measureRoot, "measure-root-path", "/tmp", "Root directory for measure catalog") cmd.Flags().StringVar(&backupOpts.propertyRoot, "property-root-path", "/tmp", "Root directory for property catalog") cmd.Flags().StringVar(&backupOpts.dest, "dest", "", "Destination URL (e.g., file:///backups)") cmd.Flags().StringVar(&backupOpts.timeStyle, "time-style", "daily", "Time directory style (daily|hourly)") cmd.Flags().StringVar( &backupOpts.schedule, "schedule", "", "Schedule expression for periodic backup. Options: @yearly, @monthly, @weekly, @daily, @hourly or @every <duration>", ) cmd.Flags().StringVar(&backupOpts.fsConfig.S3ConfigFilePath, "s3-config-file", "", "Path to the s3 configuration file") cmd.Flags().StringVar(&backupOpts.fsConfig.S3CredentialFilePath, "s3-credential-file", "", "Path to the s3 credential file") cmd.Flags().StringVar(&backupOpts.fsConfig.S3ProfileName, "s3-profile", "", "S3 profile name") cmd.Flags().StringVar(&backupOpts.fsConfig.S3ChecksumAlgorithm, "s3-checksum-algorithm", "", "S3 checksum algorithm") cmd.Flags().StringVar(&backupOpts.fsConfig.S3StorageClass, "s3-storage-class", "", "S3 upload storage class") return cmd } func backupAction(options backupOptions) error { if options.dest == "" { return errors.New("dest is required") } fs, err := newFS(options.dest, &options.fsConfig) if err != nil { return err } defer fs.Close() snapshots, err := snapshot.Get(options.gRPCAddr, options.enableTLS, options.insecure, options.cert) if err != nil { return err } timeDir := getTimeDir(options.timeStyle) for _, snp := range snapshots { var snapshotDir string snapshotDir, err = snapshot.Dir(snp, options.streamRoot, options.measureRoot, options.propertyRoot) if err != nil { logger.Warningf("Failed to get snapshot directory for %s: %v", snp.Name, err) continue } multierr.AppendInto(&err, backupSnapshot(fs, snapshotDir, snapshot.CatalogName(snp.Catalog), timeDir)) } return err } func newFS(dest string, config *remote.FsConfig) (remote.FS, error) { u, err := url.Parse(dest) if err != nil { return nil, fmt.Errorf("invalid dest URL: %w", err) } switch u.Scheme { case "file": return local.NewFS(u.Path) case "s3": return aws.NewFS(u.Path, config) default: return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme) } } func getTimeDir(style string) string { now := time.Now() switch style { case "hourly": return now.Format("2006-01-02-15") default: return now.Format("2006-01-02") } } func backupSnapshot(fs remote.FS, snapshotDir, catalog, timeDir string) error { localFiles, err := getAllFiles(snapshotDir) if err != nil { return err } ctx := context.Background() remotePrefix := path.Join(timeDir, catalog) + "/" remoteFiles, err := fs.List(ctx, remotePrefix) if err != nil { return err } for _, relPath := range localFiles { remotePath := path.Join(timeDir, catalog, relPath) if !contains(remoteFiles, remotePath) { if err := uploadFile(ctx, fs, snapshotDir, relPath, remotePath); err != nil { return err } } } deleteOrphanedFiles(ctx, fs, localFiles, remoteFiles, timeDir, catalog) return nil } func getAllFiles(root string) ([]string, error) { var files []string err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { relPath, err := filepath.Rel(root, path) if err != nil { return err } files = append(files, filepath.ToSlash(relPath)) } return nil }) return files, err } func uploadFile(ctx context.Context, fs remote.FS, snapshotDir, relPath, remotePath string) error { localPath := filepath.Join(snapshotDir, relPath) file, err := os.Open(localPath) if err != nil { return err } defer file.Close() return fs.Upload(ctx, remotePath, file) } func deleteOrphanedFiles(ctx context.Context, fs remote.FS, localFiles, remoteFiles []string, timeDir, snapshotName string) { expected := make(map[string]struct{}) for _, f := range localFiles { expected[path.Join(timeDir, snapshotName, f)] = struct{}{} } for _, remoteFile := range remoteFiles { if _, exists := expected[remoteFile]; !exists { if err := fs.Delete(ctx, remoteFile); err != nil { logger.Warningf("Warning: failed to delete orphaned file %s: %v\n", remoteFile, err) } } } } func contains(slice []string, s string) bool { for _, item := range slice { if item == s { return true } } return false }