in internal/cli/praefect/subcmd_track_repository.go [128:273]
func (req *trackRepositoryRequest) execRequest(ctx context.Context,
db *sql.DB,
cfg config.Config,
w io.Writer,
logger log.Logger,
replicateImmediately bool,
) error {
logger.WithFields(log.Fields{
"virtual_storage": req.VirtualStorage,
"relative_path": req.RelativePath,
"replica_path": req.ReplicaPath,
"authoritative_storage": req.AuthoritativeStorage,
}).Debug("track repository")
var primary string
var secondaries []string
var variableReplicationFactorEnabled, savePrimary bool
if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
savePrimary = true
primary = req.AuthoritativeStorage
for _, vs := range cfg.VirtualStorages {
if vs.Name == req.VirtualStorage {
for _, node := range vs.Nodes {
if node.Storage == req.AuthoritativeStorage {
continue
}
secondaries = append(secondaries, node.Storage)
}
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage]
if replicationFactor > 0 {
variableReplicationFactorEnabled = true
// Select random secondaries according to the default replication factor.
r.Shuffle(len(secondaries), func(i, j int) {
secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
})
secondaries = secondaries[:replicationFactor-1]
}
} else {
savePrimary = false
if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
}
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
}
authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, logger, w, primary)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
if !authoritativeRepoExists {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
}
nodeSet, err := praefect.DialNodes(
ctx,
cfg.VirtualStorages,
protoregistry.GitalyProtoPreregistered,
nil,
nil,
nil,
logger,
)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
defer nodeSet.Close()
store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
queue := datastore.NewPostgresReplicationEventQueue(db)
replMgr := praefect.NewReplMgr(
logger,
cfg.StorageNames(),
queue,
store,
praefect.StaticHealthChecker(cfg.StorageNames()),
nodeSet,
)
repositoryID, err := req.trackRepository(
ctx,
store,
w,
primary,
secondaries,
savePrimary,
variableReplicationFactorEnabled,
)
if err != nil {
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.")
correlationID := correlation.SafeRandomID()
connections := nodeSet.Connections()[req.VirtualStorage]
for _, secondary := range secondaries {
event := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RepositoryID: repositoryID,
Change: datastore.UpdateRepo,
RelativePath: req.RelativePath,
ReplicaPath: req.ReplicaPath,
VirtualStorage: req.VirtualStorage,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
if replicateImmediately {
conn, ok := connections[secondary]
if !ok {
return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
}
if err := replMgr.ProcessReplicationEvent(ctx, event, conn); err != nil {
return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary)
continue
}
if _, err := queue.Enqueue(ctx, event); err != nil {
if errors.As(err, &datastore.ReplicationEventExistsError{}) {
fmt.Fprintf(w, "replication event queue already has similar entry: %s.\n", err)
return nil
}
return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
}
fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
}
return nil
}