internal/cli/praefect/subcmd_track_repository.go (323 lines of code) (raw):
package praefect
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"math/rand"
"time"
"github.com/urfave/cli/v3"
glcli "gitlab.com/gitlab-org/gitaly/v16/internal/cli"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/metadata"
)
const (
trackRepositoryCmdName = "track-repository"
)
func newTrackRepositoryCommand() *cli.Command {
return &cli.Command{
Name: trackRepositoryCmdName,
Usage: "start tracking a repository",
Description: "This command adds a given repository to be tracked by Praefect.\n" +
"It checks if the repository exists on disk on the authoritative storage,\n" +
"and whether database records are absent from tracking the repository.\n" +
"If the 'replicate-immediately' flag is used, the command will attempt to replicate\n" +
"the repository to the secondaries. The command is blocked until the\n" +
"replication finishes. Otherwise, replication jobs will be created and will " +
"be executed eventually by Praefect in the background.\n",
Action: trackRepositoryAction,
Flags: []cli.Flag{
&cli.StringFlag{
Name: paramVirtualStorage,
Usage: "name of the repository's virtual storage",
Required: true,
},
&cli.StringFlag{
Name: paramRelativePath,
Usage: "relative path of the repository on the virtual storage. Usually starts with '@hashed'",
Required: true,
},
&cli.StringFlag{
Name: paramReplicaPath,
Usage: "path of the repository on the physical storage. Can start with '@cluster' or match the relative path",
Required: true,
},
&cli.StringFlag{
Name: paramAuthoritativeStorage,
Usage: "physical storage to consider as authoritative for the repository",
},
&cli.BoolFlag{
Name: "replicate-immediately",
Usage: "kick off a replication immediately",
},
},
Before: func(ctx context.Context, cmd *cli.Command) (context.Context, error) {
if cmd.Args().Present() {
_ = cli.ShowSubcommandHelp(cmd)
return nil, cli.Exit(unexpectedPositionalArgsError{Command: cmd.Name}, 1)
}
return ctx, nil
},
}
}
type trackRepositoryRequest struct {
RelativePath string `json:"relative_path"`
ReplicaPath string `json:"replica_path"`
VirtualStorage string `json:"virtual_storage"`
AuthoritativeStorage string `json:"authoritative_storage"`
}
var errAuthoritativeRepositoryNotExist = errors.New("authoritative repository does not exist")
func trackRepositoryAction(ctx context.Context, cmd *cli.Command) error {
logger := log.ConfigureCommand()
conf, err := readConfig(cmd.String(configFlagName))
if err != nil {
return err
}
virtualStorage := cmd.String(paramVirtualStorage)
relativePath := cmd.String(paramRelativePath)
replicaPath := cmd.String(paramReplicaPath)
authoritativeStorage := cmd.String(paramAuthoritativeStorage)
replicateImmediately := cmd.Bool("replicate-immediately")
if authoritativeStorage == "" {
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
return glcli.RequiredFlagError(paramAuthoritativeStorage)
}
}
ctx = correlation.ContextWithCorrelation(ctx, correlation.SafeRandomID())
openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
db, err := glsql.OpenDB(openDBCtx, conf.DB)
if err != nil {
return fmt.Errorf("connect to database: %w", err)
}
defer func() { _ = db.Close() }()
req := trackRepositoryRequest{
RelativePath: relativePath,
ReplicaPath: replicaPath,
AuthoritativeStorage: authoritativeStorage,
VirtualStorage: virtualStorage,
}
logger = logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
return req.execRequest(ctx, db, conf, cmd.Writer, logger, replicateImmediately)
}
const trackRepoErrorPrefix = "attempting to track repository in praefect database"
func (req *trackRepositoryRequest) execRequest(ctx context.Context,
db *sql.DB,
cfg config.Config,
w io.Writer,
logger log.Logger,
replicateImmediately bool,
) error {
logger.WithFields(log.Fields{
"virtual_storage": req.VirtualStorage,
"relative_path": req.RelativePath,
"replica_path": req.ReplicaPath,
"authoritative_storage": req.AuthoritativeStorage,
}).Debug("track repository")
var primary string
var secondaries []string
var variableReplicationFactorEnabled, savePrimary bool
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
savePrimary = true
primary = req.AuthoritativeStorage
for _, vs := range cfg.VirtualStorages {
if vs.Name == req.VirtualStorage {
for _, node := range vs.Nodes {
if node.Storage == req.AuthoritativeStorage {
continue
}
secondaries = append(secondaries, node.Storage)
}
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage]
if replicationFactor > 0 {
variableReplicationFactorEnabled = true
// Select random secondaries according to the default replication factor.
r.Shuffle(len(secondaries), func(i, j int) {
secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
})
secondaries = secondaries[:replicationFactor-1]
}
} else {
savePrimary = false
if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
}
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
}
authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, logger, w, primary)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
if !authoritativeRepoExists {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
nodeSet, err := praefect.DialNodes(
ctx,
cfg.VirtualStorages,
protoregistry.GitalyProtoPreregistered,
nil,
nil,
nil,
logger,
)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
defer nodeSet.Close()
store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
queue := datastore.NewPostgresReplicationEventQueue(db)
replMgr := praefect.NewReplMgr(
logger,
cfg.StorageNames(),
queue,
store,
praefect.StaticHealthChecker(cfg.StorageNames()),
nodeSet,
)
repositoryID, err := req.trackRepository(
ctx,
store,
w,
primary,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.")
correlationID := correlation.SafeRandomID()
connections := nodeSet.Connections()[req.VirtualStorage]
for _, secondary := range secondaries {
event := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: repositoryID,
Change: datastore.UpdateRepo,
RelativePath: req.RelativePath,
ReplicaPath: req.ReplicaPath,
VirtualStorage: req.VirtualStorage,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
if replicateImmediately {
conn, ok := connections[secondary]
if !ok {
return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
}
if err := replMgr.ProcessReplicationEvent(ctx, event, conn); err != nil {
return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary)
continue
}
if _, err := queue.Enqueue(ctx, event); err != nil {
if errors.As(err, &datastore.ReplicationEventExistsError{}) {
fmt.Fprintf(w, "replication event queue already has similar entry: %s.\n", err)
return nil
}
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
}
return nil
}
func (req *trackRepositoryRequest) trackRepository(
ctx context.Context,
ds *datastore.PostgresRepositoryStore,
w io.Writer,
primary string,
secondaries []string,
savePrimary bool,
variableReplicationFactorEnabled bool,
) (int64, error) {
repositoryID, err := ds.ReserveRepositoryID(ctx, req.VirtualStorage, req.RelativePath)
if err != nil {
if errors.Is(err, datastore.ErrRepositoryAlreadyExists) {
fmt.Fprintf(w, "repository is already tracked in praefect database\n")
existingID, err := ds.GetRepositoryID(ctx, req.VirtualStorage, req.RelativePath)
if err != nil {
return 0, fmt.Errorf("GetRepositoryID: %w", err)
}
return existingID, nil
}
return 0, fmt.Errorf("ReserveRepositoryID: %w", err)
}
if err := ds.CreateRepository(
ctx,
repositoryID,
req.VirtualStorage,
req.RelativePath,
req.ReplicaPath,
primary,
nil,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
); err != nil {
return 0, fmt.Errorf("CreateRepository: %w", err)
}
return repositoryID, nil
}
func (req *trackRepositoryRequest) authoritativeRepositoryExists(ctx context.Context, cfg config.Config, logger log.Logger, w io.Writer, nodeName string) (_ bool, returnedErr error) {
for _, vs := range cfg.VirtualStorages {
if vs.Name != req.VirtualStorage {
continue
}
for _, node := range vs.Nodes {
if node.Storage == nodeName {
logger.WithFields(log.Fields{
"replica_path": req.ReplicaPath,
"storage_node": node.Storage,
"node_address": node.Address,
}).Debug("check if repository exists on Gitaly node")
conn, err := subCmdDial(ctx, node.Address, node.Token, defaultDialTimeout)
if err != nil {
return false, fmt.Errorf("error dialing: %w", err)
}
defer func() {
err = conn.Close()
if err != nil {
returnedErr = errors.Join(returnedErr, fmt.Errorf("closing connection %w", err))
}
}()
ctx = metadata.AppendToOutgoingContext(ctx, "client_name", trackRepositoryCmdName)
repositoryClient := gitalypb.NewRepositoryServiceClient(conn)
exists, err := repositoryExists(ctx, repositoryClient, node.Storage, req.ReplicaPath)
if err != nil {
fmt.Fprintf(w, "checking if repository exists %q, %q\n", node.Storage, req.ReplicaPath)
return false, nil
}
return exists, nil
}
}
return false, fmt.Errorf("node %q not found", req.AuthoritativeStorage)
}
return false, fmt.Errorf("virtual storage %q not found", req.VirtualStorage)
}
func repositoryExists(ctx context.Context, client gitalypb.RepositoryServiceClient, storageName, relativePath string) (bool, error) {
response, err := client.RepositoryExists(
ctx,
&gitalypb.RepositoryExistsRequest{
Repository: &gitalypb.Repository{
StorageName: storageName,
RelativePath: relativePath,
},
},
)
return response.GetExists(), err
}