func()

in internal/cli/praefect/subcmd_track_repository.go [128:273]


func (req *trackRepositoryRequest) execRequest(ctx context.Context,
	db *sql.DB,
	cfg config.Config,
	w io.Writer,
	logger log.Logger,
	replicateImmediately bool,
) error {
	logger.WithFields(log.Fields{
		"virtual_storage":       req.VirtualStorage,
		"relative_path":         req.RelativePath,
		"replica_path":          req.ReplicaPath,
		"authoritative_storage": req.AuthoritativeStorage,
	}).Debug("track repository")

	var primary string
	var secondaries []string
	var variableReplicationFactorEnabled, savePrimary bool
	if cfg.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
		savePrimary = true
		primary = req.AuthoritativeStorage

		for _, vs := range cfg.VirtualStorages {
			if vs.Name == req.VirtualStorage {
				for _, node := range vs.Nodes {
					if node.Storage == req.AuthoritativeStorage {
						continue
					}
					secondaries = append(secondaries, node.Storage)
				}
			}
		}

		r := rand.New(rand.NewSource(time.Now().UnixNano()))
		replicationFactor := cfg.DefaultReplicationFactors()[req.VirtualStorage]

		if replicationFactor > 0 {
			variableReplicationFactorEnabled = true
			// Select random secondaries according to the default replication factor.
			r.Shuffle(len(secondaries), func(i, j int) {
				secondaries[i], secondaries[j] = secondaries[j], secondaries[i]
			})

			secondaries = secondaries[:replicationFactor-1]
		}
	} else {
		savePrimary = false
		if err := db.QueryRowContext(ctx, `SELECT node_name FROM shard_primaries WHERE shard_name = $1 AND demoted = 'false'`, req.VirtualStorage).Scan(&primary); err != nil {
			if errors.Is(err, sql.ErrNoRows) {
				return fmt.Errorf("%s: no primaries found", trackRepoErrorPrefix)
			}
			return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
		}
	}

	authoritativeRepoExists, err := req.authoritativeRepositoryExists(ctx, cfg, logger, w, primary)
	if err != nil {
		return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
	}

	if !authoritativeRepoExists {
		return fmt.Errorf("%s: %w", trackRepoErrorPrefix, errAuthoritativeRepositoryNotExist)
	}

	nodeSet, err := praefect.DialNodes(
		ctx,
		cfg.VirtualStorages,
		protoregistry.GitalyProtoPreregistered,
		nil,
		nil,
		nil,
		logger,
	)
	if err != nil {
		return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
	}
	defer nodeSet.Close()

	store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
	queue := datastore.NewPostgresReplicationEventQueue(db)
	replMgr := praefect.NewReplMgr(
		logger,
		cfg.StorageNames(),
		queue,
		store,
		praefect.StaticHealthChecker(cfg.StorageNames()),
		nodeSet,
	)

	repositoryID, err := req.trackRepository(
		ctx,
		store,
		w,
		primary,
		secondaries,
		savePrimary,
		variableReplicationFactorEnabled,
	)
	if err != nil {
		return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
	}

	fmt.Fprintln(w, "Finished adding new repository to be tracked in praefect database.")

	correlationID := correlation.SafeRandomID()
	connections := nodeSet.Connections()[req.VirtualStorage]

	for _, secondary := range secondaries {
		event := datastore.ReplicationEvent{
			Job: datastore.ReplicationJob{
				RepositoryID:      repositoryID,
				Change:            datastore.UpdateRepo,
				RelativePath:      req.RelativePath,
				ReplicaPath:       req.ReplicaPath,
				VirtualStorage:    req.VirtualStorage,
				SourceNodeStorage: primary,
				TargetNodeStorage: secondary,
			},
			Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
		}
		if replicateImmediately {
			conn, ok := connections[secondary]
			if !ok {
				return fmt.Errorf("%s: connection for %q not found", trackRepoErrorPrefix, secondary)
			}

			if err := replMgr.ProcessReplicationEvent(ctx, event, conn); err != nil {
				return fmt.Errorf("%s: processing replication event %w", trackRepoErrorPrefix, err)
			}

			fmt.Fprintf(w, "Finished replicating repository to %q.\n", secondary)
			continue
		}

		if _, err := queue.Enqueue(ctx, event); err != nil {
			if errors.As(err, &datastore.ReplicationEventExistsError{}) {
				fmt.Fprintf(w, "replication event queue already has similar entry: %s.\n", err)
				return nil
			}

			return fmt.Errorf("%s: %w", trackRepoErrorPrefix, err)
		}
		fmt.Fprintf(w, "Added replication job to replicate repository to %q.\n", secondary)
	}

	return nil
}