banyand/backup/snapshot/snapshot.go (69 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package snapshot provides the snapshot backup and restore functionality.
package snapshot
import (
"context"
"errors"
"fmt"
"path/filepath"
"time"
"google.golang.org/grpc"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
// Conn connects to the gRPC server and executes the given function.
func Conn[T any](gRPCAddr string, enableTLS, insecure bool, cert string, delFn func(conn *grpc.ClientConn) (T, error)) (T, error) {
opts, err := grpchelper.SecureOptions(nil, enableTLS, insecure, cert)
if err != nil {
var zero T
return zero, err
}
connection, err := grpchelper.Conn(gRPCAddr, 10*time.Second, opts...)
if err != nil {
var zero T
return zero, fmt.Errorf("failed to connect to gRPC server: %w", err)
}
defer connection.Close()
return delFn(connection)
}
// Get retrieves the snapshots from the gRPC server.
func Get(gRPCAddr string, enableTLS, insecure bool, cert string, groups ...*databasev1.SnapshotRequest_Group) ([]*databasev1.Snapshot, error) {
return Conn(gRPCAddr, enableTLS, insecure, cert, func(conn *grpc.ClientConn) ([]*databasev1.Snapshot, error) {
ctx := context.Background()
client := databasev1.NewSnapshotServiceClient(conn)
snapshotResp, err := client.Snapshot(ctx, &databasev1.SnapshotRequest{Groups: groups})
if err != nil {
return nil, fmt.Errorf("failed to request snapshot: %w", err)
}
return snapshotResp.Snapshots, nil
})
}
// Dir returns the directory path of the snapshot.
func Dir(snapshot *databasev1.Snapshot, streamRoot, measureRoot, propertyRoot string) (string, error) {
var baseDir string
switch snapshot.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
baseDir = LocalDir(streamRoot, snapshot.Catalog)
case commonv1.Catalog_CATALOG_MEASURE:
baseDir = LocalDir(measureRoot, snapshot.Catalog)
case commonv1.Catalog_CATALOG_PROPERTY:
baseDir = LocalDir(propertyRoot, snapshot.Catalog)
default:
return "", errors.New("unknown catalog type")
}
return filepath.Join(baseDir, storage.SnapshotsDir, snapshot.Name), nil
}
// LocalDir returns the local directory path of the snapshot.
func LocalDir(rootPath string, catalog commonv1.Catalog) string {
return filepath.Join(rootPath, CatalogName(catalog))
}
// CatalogName returns the name of the catalog.
func CatalogName(catalog commonv1.Catalog) string {
switch catalog {
case commonv1.Catalog_CATALOG_STREAM:
return "stream"
case commonv1.Catalog_CATALOG_MEASURE:
return "measure"
case commonv1.Catalog_CATALOG_PROPERTY:
return "property"
default:
logger.Panicf("unknown catalog type: %v", catalog)
return ""
}
}