banyand/backup/timedir.go (200 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package backup
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/fs/remote"
)
// NewTimeDirCommand creates a new time-dir command.
func NewTimeDirCommand() *cobra.Command {
rootCmd := &cobra.Command{
Use: "timedir",
Short: "Manage 'time-dir' files for backup and restoration",
}
// Register subcommands.
rootCmd.AddCommand(newListCmd())
rootCmd.AddCommand(newCreateCmd())
rootCmd.AddCommand(newReadCmd())
rootCmd.AddCommand(newDeleteCmd())
return rootCmd
}
func newListCmd() *cobra.Command {
var (
dest string
fsConfig remote.FsConfig
prefix string
)
cmd := &cobra.Command{
Use: "list",
Short: "List remote time directories in the remote file system",
RunE: func(cmd *cobra.Command, _ []string) error {
if dest == "" {
return errors.New("--dest is required")
}
// Create a remote file system client using the provided URL.
fs, err := newFS(dest, &fsConfig)
if err != nil {
return err
}
ctx := context.Background()
// List files starting with an optional prefix.
files, err := fs.List(ctx, prefix)
if err != nil {
return fmt.Errorf("failed to list remote files: %w", err)
}
// Extract unique top-level directories (which are our time directories).
dirSet := make(map[string]bool)
for _, f := range files {
// Normalize to forward-slash separators.
normalized := filepath.ToSlash(f)
parts := strings.SplitN(normalized, "/", 2)
if len(parts) > 0 && parts[0] != "" {
dirSet[parts[0]] = true
}
}
var dirs []string
for d := range dirSet {
dirs = append(dirs, d)
}
sort.Strings(dirs)
fmt.Fprintln(cmd.OutOrStdout(), "Remote time directories:")
for _, d := range dirs {
fmt.Fprintln(cmd.OutOrStdout(), d)
}
return nil
},
}
cmd.Flags().StringVar(&dest, "dest", "", "Destination URL of the remote file system (e.g., file:///backups)")
cmd.Flags().StringVar(&prefix, "prefix", "", "Prefix in the remote file system to list")
cmd.Flags().StringVar(&fsConfig.S3ConfigFilePath, "s3-config-file", "", "Path to the s3 configuration file")
cmd.Flags().StringVar(&fsConfig.S3CredentialFilePath, "s3-credential-file", "", "Path to the s3 credential file")
cmd.Flags().StringVar(&fsConfig.S3ProfileName, "s3-profile", "", "S3 profile name")
return cmd
}
func newCreateCmd() *cobra.Command {
var catalogs []string
var streamRoot, measureRoot, propertyRoot string
var timeStyle string
cmd := &cobra.Command{
Use: "create [time]",
Short: "Create local 'time-dir' file(s) in catalog directories",
RunE: func(cmd *cobra.Command, args []string) error {
if len(catalogs) == 0 {
catalogs = []string{"stream", "measure", "property"}
}
var tValue string
if len(args) > 0 {
tValue = strings.TrimSpace(args[0])
} else {
tValue = getTimeDir(timeStyle)
}
for _, cat := range catalogs {
filePath, err := getLocalTimeDirFilePath(cat, streamRoot, measureRoot, propertyRoot)
if err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "Skipping unknown catalog '%s': %v\n", cat, err)
continue
}
dirPath := filepath.Dir(filePath)
if err = os.MkdirAll(dirPath, storage.DirPerm); err != nil {
return fmt.Errorf("failed to create time-dir directory: %w", err)
}
err = os.WriteFile(filePath, []byte(tValue), storage.FilePerm)
if err != nil {
return fmt.Errorf("failed to write time-dir file: %w", err)
}
fmt.Fprintf(cmd.OutOrStdout(), "Created time-dir for catalog '%s' at %s with content '%s'\n", cat, filePath, tValue)
}
return nil
},
}
cmd.Flags().StringSliceVar(&catalogs, "catalog", nil, "Catalog(s) to create time-dir file (e.g., stream, measure, property). Defaults to all if not provided.")
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local root directory for property catalog")
cmd.Flags().StringVar(&timeStyle, "time-style", "daily", "Time style to compute time string (daily or hourly)")
return cmd
}
func newReadCmd() *cobra.Command {
var catalogs []string
var streamRoot, measureRoot, propertyRoot string
cmd := &cobra.Command{
Use: "read",
Short: "Read local 'time-dir' file(s) from catalog directories",
RunE: func(cmd *cobra.Command, _ []string) error {
// If no catalog is specified, process all three.
if len(catalogs) == 0 {
catalogs = []string{"stream", "measure", "property"}
}
for _, cat := range catalogs {
filePath, err := getLocalTimeDirFilePath(cat, streamRoot, measureRoot, propertyRoot)
if err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "Skipping unknown catalog '%s': %v\n", cat, err)
continue
}
data, err := os.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
fmt.Fprintf(cmd.ErrOrStderr(), "Catalog '%s': time-dir file not found at %s\n", cat, filePath)
} else {
fmt.Fprintf(cmd.ErrOrStderr(), "Catalog '%s': failed to read time-dir file at %s: %v\n", cat, filePath, err)
}
} else {
fmt.Fprintf(cmd.OutOrStdout(), "Catalog '%s': time-dir content: '%s'\n", cat, strings.TrimSpace(string(data)))
}
}
return nil
},
}
cmd.Flags().StringSliceVar(&catalogs, "catalog", nil, "Catalog(s) to read time-dir file (e.g., stream, measure, property). Defaults to all if not provided.")
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local root directory for property catalog")
return cmd
}
func newDeleteCmd() *cobra.Command {
var catalogs []string
var streamRoot, measureRoot, propertyRoot string
cmd := &cobra.Command{
Use: "delete",
Short: "Delete local 'time-dir' file(s) from catalog directories",
RunE: func(cmd *cobra.Command, _ []string) error {
// If no catalog is specified, process all three.
if len(catalogs) == 0 {
catalogs = []string{"stream", "measure", "property"}
}
for _, cat := range catalogs {
filePath, err := getLocalTimeDirFilePath(cat, streamRoot, measureRoot, propertyRoot)
if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "Skipping unknown catalog '%s': %v\n", cat, err)
continue
}
err = os.Remove(filePath)
if err != nil {
if os.IsNotExist(err) {
fmt.Fprintf(cmd.ErrOrStderr(), "Catalog '%s': time-dir file not found at %s\n", cat, filePath)
} else {
fmt.Fprintf(cmd.ErrOrStderr(), "Failed to delete time-dir for catalog '%s' at %s: %v\n", cat, filePath, err)
}
} else {
fmt.Fprintf(cmd.OutOrStdout(), "Deleted time-dir for catalog '%s' at %s\n", cat, filePath)
}
}
return nil
},
}
cmd.Flags().StringSliceVar(&catalogs, "catalog", nil, "Catalog(s) to delete time-dir file (e.g., stream, measure, property). Defaults to all if not provided.")
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local root directory for property catalog")
return cmd
}
func getLocalTimeDirFilePath(catalog, streamRoot, measureRoot, propertyRoot string) (string, error) {
switch strings.ToLower(catalog) {
case "stream":
return filepath.Join(streamRoot, "stream", "time-dir"), nil
case "measure":
return filepath.Join(measureRoot, "measure", "time-dir"), nil
case "property":
return filepath.Join(propertyRoot, "property", "time-dir"), nil
default:
return "", fmt.Errorf("unknown catalog type: %s", catalog)
}
}