internal/praefect/datastore/repository_store.go (687 lines of code) (raw):
package datastore
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"gitlab.com/gitlab-org/gitaly/v16/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)
type storages map[string][]string
// GenerationUnknown is used to indicate lack of generation number in
// a replication job. Older instances can produce replication jobs
// without a generation number.
const GenerationUnknown = -1
var errWriteToOutdatedNodes = errors.New("write to outdated nodes")
// DowngradeAttemptedError is returned when attempting to get the replicated generation for a source repository
// that does not upgrade the target repository.
type DowngradeAttemptedError struct {
Storage string
CurrentGeneration int
AttemptedGeneration int
}
func (err DowngradeAttemptedError) Error() string {
return fmt.Sprintf("attempted downgrading storage %q from generation %d to %d",
err.Storage, err.CurrentGeneration, err.AttemptedGeneration,
)
}
var (
// ErrNoRowsAffected is returned when a query did not perform any changes.
ErrNoRowsAffected = errors.New("no rows were affected by the query")
// ErrRepositoryAlreadyExists is returned when trying to insert a repository into the datastore that already
// exists.
ErrRepositoryAlreadyExists = errors.New("repository already exists")
// ErrRepositoryNotFound is returned when looking up a repository that does not exist in the datastore.
ErrRepositoryNotFound = errors.New("repository not found")
)
// RepositoryStore provides access to repository state.
type RepositoryStore interface {
// GetGeneration gets the repository's generation on a given storage.
GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error)
// IncrementGeneration increments the generations of up to date nodes.
IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error
// SetGeneration sets the repository's generation on the given storage. If the generation is higher
// than the virtual storage's generation, it is set to match as well to guarantee monotonic increments.
SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error
// GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record
// for the repository ID is not found.
GetReplicaPath(ctx context.Context, repositoryID int64) (string, error)
// GetReplicatedGeneration returns the generation propagated by applying the replication. If the generation would
// downgrade, a DowngradeAttemptedError is returned.
GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error)
// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
// the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the store.
//
// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
// the repository's primary.
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error
// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storage string) error
// DeleteRepository deletes the database records associated with the repository. It returns the replica path and the storages
// which are known to have a replica at the time of deletion. ErrRepositoryNotFound is returned when
// the repository is not tracked by the Praefect datastore.
DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error)
// DeleteAllRepositories deletes the database records associated with
// repositories in the specified virtual storage.
DeleteAllRepositories(ctx context.Context, virtualStorage string) error
// DeleteReplica deletes a replica of a repository from a storage without affecting other state in the virtual storage.
DeleteReplica(ctx context.Context, repositoryID int64, storage string) error
// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
ConsistentStoragesGetter
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
// DeleteInvalidRepository is a method for deleting records of invalid repositories. It deletes a storage's
// record of the invalid repository. If the storage was the only storage with the repository, the repository's
// record on the virtual storage is also deleted.
DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error
// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
// exists with the given virtual storage and relative path combination, an error is returned.
ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
// ErrRepositoryNotFound error if the repository doesn't exist.
GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error)
// GetRepositoryMetadata retrieves a repository's metadata.
GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error)
// GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error)
// MarkUnverified marks replicas of the repository unverified.
MarkUnverified(ctx context.Context, repositoryID int64) (int64, error)
// MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified.
MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
// MarkStorageUnverified marsk all replicas on the storage as unverified.
MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error)
}
// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
// Refer to the interface for method documentation.
type PostgresRepositoryStore struct {
db glsql.Querier
storages
}
// NewPostgresRepositoryStore returns a Postgres implementation of RepositoryStore.
func NewPostgresRepositoryStore(db glsql.Querier, configuredStorages map[string][]string) *PostgresRepositoryStore {
return &PostgresRepositoryStore{db: db, storages: storages(configuredStorages)}
}
// MarkUnverified marks replicas of the repository unverified.
func (rs *PostgresRepositoryStore) MarkUnverified(ctx context.Context, repositoryID int64) (int64, error) {
result, err := rs.db.ExecContext(ctx, `
UPDATE storage_repositories
SET verified_at = NULL
WHERE repository_id = $1
AND verified_at IS NOT NULL
`, repositoryID)
if err != nil {
return 0, fmt.Errorf("query: %w", err)
}
return result.RowsAffected()
}
// MarkVirtualStorageUnverified marks all replicas on the virtual storage as unverified.
func (rs *PostgresRepositoryStore) MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) {
result, err := rs.db.ExecContext(ctx, `
UPDATE storage_repositories
SET verified_at = NULL
FROM repositories
WHERE repositories.virtual_storage = $1
AND repositories.repository_id = storage_repositories.repository_id
AND verified_at IS NOT NULL
`, virtualStorage)
if err != nil {
return 0, fmt.Errorf("query: %w", err)
}
return result.RowsAffected()
}
// MarkStorageUnverified marsk all replicas on the storage as unverified.
func (rs *PostgresRepositoryStore) MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) {
result, err := rs.db.ExecContext(ctx, `
UPDATE storage_repositories
SET verified_at = NULL
FROM repositories
WHERE repositories.repository_id = storage_repositories.repository_id
AND repositories.virtual_storage = $1
AND storage_repositories.storage = $2
AND verified_at IS NOT NULL
`, virtualStorage, storage)
if err != nil {
return 0, fmt.Errorf("query: %w", err)
}
return result.RowsAffected()
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) GetGeneration(ctx context.Context, repositoryID int64, storage string) (int, error) {
const q = `
SELECT generation
FROM storage_repositories
WHERE repository_id = $1
AND storage = $2
`
var gen int
if err := rs.db.QueryRowContext(ctx, q, repositoryID, storage).Scan(&gen); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return GenerationUnknown, nil
}
return 0, err
}
return gen, nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) IncrementGeneration(ctx context.Context, repositoryID int64, primary string, secondaries []string) error {
const q = `
WITH updated_replicas AS (
UPDATE storage_repositories
SET generation = generation + 1
FROM (
SELECT repository_id, storage
FROM repositories
JOIN storage_repositories USING (repository_id, generation)
WHERE repository_id = $1
AND storage = ANY($2)
FOR UPDATE
) AS to_update
WHERE storage_repositories.repository_id = to_update.repository_id
AND storage_repositories.storage = to_update.storage
RETURNING storage_repositories.repository_id
),
updated_repository AS (
UPDATE repositories
SET generation = generation + 1
FROM (
SELECT DISTINCT repository_id
FROM updated_replicas
) AS updated_repositories
WHERE repositories.repository_id = updated_repositories.repository_id
)
SELECT
EXISTS (
SELECT FROM repositories
WHERE repository_id = $1
) AS repository_exists,
EXISTS ( SELECT FROM updated_replicas ) AS repository_updated
`
var repositoryExists, repositoryUpdated bool
if err := rs.db.QueryRowContext(
ctx, q, repositoryID, append(secondaries, primary),
).Scan(&repositoryExists, &repositoryUpdated); err != nil {
return fmt.Errorf("scan: %w", err)
}
if !repositoryExists {
return ErrRepositoryNotFound
}
if !repositoryUpdated {
return errWriteToOutdatedNodes
}
return nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) SetGeneration(ctx context.Context, repositoryID int64, storage, relativePath string, generation int) error {
const q = `
WITH repository AS (
UPDATE repositories SET generation = $3
WHERE repository_id = $1
AND COALESCE(repositories.generation, -1) < $3
)
INSERT INTO storage_repositories (
repository_id,
virtual_storage,
relative_path,
storage,
generation
)
SELECT
repository_id,
virtual_storage,
$4,
$2,
$3
FROM repositories
WHERE repository_id = $1
ON CONFLICT (repository_id, storage) DO UPDATE SET
relative_path = EXCLUDED.relative_path,
generation = EXCLUDED.generation
`
_, err := rs.db.ExecContext(ctx, q, repositoryID, storage, generation, relativePath)
return err
}
// SetAuthoritativeReplica sets the given replica of a repsitory as the authoritative one by setting its generation as the latest one.
func (rs *PostgresRepositoryStore) SetAuthoritativeReplica(ctx context.Context, virtualStorage, relativePath, storageName string) error {
result, err := rs.db.ExecContext(ctx, `
WITH updated_repository AS (
UPDATE repositories
SET generation = generation + 1
WHERE virtual_storage = $1
AND relative_path = $2
RETURNING repository_id, virtual_storage, relative_path, generation
)
INSERT INTO storage_repositories (repository_id, virtual_storage, relative_path, storage, generation)
SELECT repository_id, virtual_storage, relative_path, $3, generation
FROM updated_repository
ON CONFLICT (virtual_storage, relative_path, storage) DO UPDATE
SET repository_id = EXCLUDED.repository_id,
generation = EXCLUDED.generation
`, virtualStorage, relativePath, storageName)
if err != nil {
return fmt.Errorf("exec: %w", err)
}
if rowsAffected, err := result.RowsAffected(); err != nil {
return fmt.Errorf("rows affected: %w", err)
} else if rowsAffected == 0 {
return ErrRepositoryNotFound
}
return nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) GetReplicatedGeneration(ctx context.Context, repositoryID int64, source, target string) (int, error) {
const q = `
SELECT storage, generation
FROM storage_repositories
WHERE repository_id = $1
AND storage = ANY($2)
`
rows, err := rs.db.QueryContext(ctx, q, repositoryID, []string{source, target})
if err != nil {
return 0, err
}
defer rows.Close()
sourceGeneration := GenerationUnknown
targetGeneration := GenerationUnknown
for rows.Next() {
var storage string
var generation int
if err := rows.Scan(&storage, &generation); err != nil {
return 0, err
}
switch storage {
case source:
sourceGeneration = generation
case target:
targetGeneration = generation
default:
return 0, fmt.Errorf("unexpected storage: %s", storage)
}
}
if err := rows.Err(); err != nil {
return 0, err
}
if targetGeneration != GenerationUnknown && targetGeneration >= sourceGeneration {
return 0, DowngradeAttemptedError{
Storage: target,
CurrentGeneration: targetGeneration,
AttemptedGeneration: sourceGeneration,
}
}
return sourceGeneration, nil
}
// CreateRepository creates a record for a repository in the specified virtual storage and relative path.
// Primary is the storage the repository was created on. UpdatedSecondaries are secondaries that participated
// and successfully completed the transaction. OutdatedSecondaries are secondaries that were outdated or failed
// the transaction. Returns ErrRepositoryAlreadyExists when trying to create a repository which already exists in the
// store.
//
// storePrimary should be set when repository specific primaries are enabled. When set, the primary is stored as
// the repository's primary.
//
// storeAssignments should be set when variable replication factor is enabled. When set, the primary and the
// secondaries are stored as the assigned hosts of the repository.
func (rs *PostgresRepositoryStore) CreateRepository(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
const q = `
WITH repo AS (
INSERT INTO repositories (
repository_id,
virtual_storage,
relative_path,
replica_path,
generation,
"primary"
) VALUES ($8, $1, $2, $9, 0, CASE WHEN $4 THEN $3 END)
),
assignments AS (
INSERT INTO repository_assignments (
repository_id,
virtual_storage,
relative_path,
storage
)
SELECT $8, $1, $2, storage
FROM (
SELECT $3 AS storage
UNION
SELECT unnest($5::text[])
UNION
SELECT unnest($6::text[])
) AS storages
WHERE $7
)
INSERT INTO storage_repositories (
repository_id,
virtual_storage,
relative_path,
storage,
generation
)
SELECT $8, $1, $2, storage, 0
FROM (
SELECT $3 AS storage
UNION
SELECT unnest($5::text[])
) AS updated_storages
`
_, err := rs.db.ExecContext(ctx, q,
virtualStorage,
relativePath,
primary,
storePrimary,
updatedSecondaries,
outdatedSecondaries,
storeAssignments,
repositoryID,
replicaPath,
)
if err != nil {
if glsql.IsUniqueViolation(err, "repositories_pkey") {
return fmt.Errorf("repository id %d already in use", repositoryID)
}
if glsql.IsUniqueViolation(err, "storage_repositories_pkey") || glsql.IsUniqueViolation(err, "storage_repositories_new_pkey") {
return ErrRepositoryAlreadyExists
}
return err
}
return nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) DeleteRepository(ctx context.Context, virtualStorage, relativePath string) (string, []string, error) {
var (
replicaPath string
storages glsql.StringArray
)
if err := rs.db.QueryRowContext(ctx, `
WITH repository AS (
DELETE FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
RETURNING repository_id, replica_path
)
SELECT replica_path, ARRAY_AGG(storage_repositories.storage)
FROM repository
LEFT JOIN storage_repositories USING (repository_id)
GROUP BY replica_path
`, virtualStorage, relativePath,
).Scan(&replicaPath, &storages); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", nil, ErrRepositoryNotFound
}
return "", nil, fmt.Errorf("scan: %w", err)
}
return replicaPath, storages.Slice(), nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) DeleteAllRepositories(ctx context.Context, virtualStorage string) error {
_, err := rs.db.ExecContext(ctx, `
WITH delete_jobs AS (
DELETE FROM replication_queue
WHERE job->>'virtual_storage' = $1
RETURNING id
),
delete_job_locks AS (
DELETE FROM replication_queue_job_lock
USING delete_jobs
WHERE job_id = delete_jobs.id
),
delete_locks AS (
DELETE FROM replication_queue_lock
WHERE id LIKE $1 || '|%|%'
)
DELETE FROM repositories
WHERE virtual_storage = $1;
`, virtualStorage)
if err != nil {
return err
}
return nil
}
// DeleteReplica deletes a record from the `storage_repositories`. See the interface documentation for details.
func (rs *PostgresRepositoryStore) DeleteReplica(ctx context.Context, repositoryID int64, storage string) error {
result, err := rs.db.ExecContext(ctx, `
DELETE FROM storage_repositories
WHERE repository_id = $1
AND storage = $2
`, repositoryID, storage)
if err != nil {
return err
}
if n, err := result.RowsAffected(); err != nil {
return err
} else if n == 0 {
return ErrNoRowsAffected
}
return nil
}
// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) {
return rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, relative_path, generation)
WHERE repository_id = $1
GROUP BY replica_path
`, repositoryID)
}
// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
return rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
JOIN storage_repositories USING (repository_id, relative_path, generation)
WHERE repositories.virtual_storage = $1
AND repositories.relative_path = $2
GROUP BY replica_path
`, virtualStorage, relativePath)
}
// getConsistentStorages is a helper for querying the consistent storages by different keys.
func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, *datastructure.Set[string], error) {
var replicaPath string
var storages glsql.StringArray
if err := rs.db.QueryRowContext(ctx, query, params...).Scan(&replicaPath, &storages); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", nil, ErrRepositoryNotFound
}
return "", nil, fmt.Errorf("query: %w", err)
}
return replicaPath, datastructure.SetFromSlice(storages.Slice()), nil
}
//nolint:revive // This is unintentionally missing documentation.
func (rs *PostgresRepositoryStore) RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) {
const q = `
SELECT true
FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
`
var exists bool
if err := rs.db.QueryRowContext(ctx, q, virtualStorage, relativePath).Scan(&exists); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return exists, nil
}
// DeleteInvalidRepository deletes the given replica. If the replica was the only replica of the
// repository, then the repository will be deleted, as well.
func (rs *PostgresRepositoryStore) DeleteInvalidRepository(ctx context.Context, repositoryID int64, storage string) error {
_, err := rs.db.ExecContext(ctx, `
WITH repository AS (
SELECT repository_id
FROM repositories
WHERE repository_id = $1
FOR UPDATE
),
invalid_repository AS (
DELETE FROM storage_repositories
USING repository
WHERE storage_repositories.repository_id = repository.repository_id
AND storage = $2
)
DELETE FROM repositories
USING repository
WHERE repositories.repository_id = repository.repository_id
AND NOT EXISTS (
SELECT 1
FROM storage_repositories
WHERE repository_id = $1
AND storage != $2
)
`, repositoryID, storage)
return err
}
// Replica represents a replica of a repository.
type Replica struct {
// Storage is the name of the replica's storage.
Storage string
// Generation is the replica's confirmed generation. If the replica does not yet exists, generation
// is -1.
Generation int64
// Assigned indicates whether the storage is an assigned host of the repository.
Assigned bool
// Healthy indicates whether the replica is considered healthy by the consensus of Praefect nodes.
Healthy bool
// ValidPrimary indicates whether the replica is ready to serve as the primary if necessary.
ValidPrimary bool
// VerifiedAt is the last successful verification time of the replica.
VerifiedAt time.Time
}
// RepositoryMetadata contains the repository's metadata.
type RepositoryMetadata struct {
// RepositoryID is the internal id of the repository.
RepositoryID int64
// VirtualStorage is the virtual storage where the repository is.
VirtualStorage string
// RelativePath is the relative path of the repository.
RelativePath string
// ReplicaPath is the actual disk location where the replicas are stored in the storages.
ReplicaPath string
// Primary is the current primary of this repository.
Primary string
// Generation is the current generation of the repository.
Generation int64
// Replicas contains information of the repository on each storage that contains the repository
// or does not contain the repository but is assigned to host it.
Replicas []Replica
}
// GetRepositoryMetadata retrieves a repository's metadata.
func (rs *PostgresRepositoryStore) GetRepositoryMetadata(ctx context.Context, repositoryID int64) (RepositoryMetadata, error) {
metadata, err := rs.getRepositoryMetadata(
ctx,
"WHERE repository_id = $3",
"WHERE repository_id = $3",
"",
repositoryID,
)
if err != nil {
return RepositoryMetadata{}, err
}
if len(metadata) == 0 {
return RepositoryMetadata{}, ErrRepositoryNotFound
}
return metadata[0], nil
}
// GetRepositoryMetadataByPath retrieves a repository's metadata by its virtual path.
func (rs *PostgresRepositoryStore) GetRepositoryMetadataByPath(ctx context.Context, virtualStorage, relativePath string) (RepositoryMetadata, error) {
metadata, err := rs.getRepositoryMetadata(
ctx,
"WHERE virtual_storage = $3 AND relative_path = $4",
"WHERE repository_id = (SELECT repository_id FROM repositories)",
"",
virtualStorage,
relativePath,
)
if err != nil {
return RepositoryMetadata{}, err
}
if len(metadata) == 0 {
return RepositoryMetadata{}, ErrRepositoryNotFound
}
return metadata[0], nil
}
// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
func (rs *PostgresRepositoryStore) GetPartiallyAvailableRepositories(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) {
_, ok := rs.storages[virtualStorage]
if !ok {
return nil, fmt.Errorf("unknown virtual storage: %q", virtualStorage)
}
return rs.getRepositoryMetadata(ctx,
"WHERE virtual_storage = $3",
"WHERE virtual_storage = $3",
"HAVING bool_or(NOT valid_primaries.storage IS NOT NULL) FILTER(WHERE assigned)",
virtualStorage,
)
}
// GetPartiallyAvailableRepositories returns information on repositories which have assigned replicas which
// are not able to serve requests at the moment.
func (rs *PostgresRepositoryStore) getRepositoryMetadata(ctx context.Context, repositoriesFilter, validPrimariesFilter, groupFilter string, filterArgs ...interface{}) ([]RepositoryMetadata, error) {
// The query below gets the status of every repository which has one or more assigned replicas that
// are not able to serve requests at the moment. The status includes how many changes a replica is behind,
// whether the replica is assigned host or not, whether the replica is healthy and whether the replica is
// considered a valid primary candidate. It works as follows:
//
// 1. First we get all the storages which contain the repository from `storage_repositories`. We
// list every copy of the repository as the latest generation could exist on an unassigned
// storage.
//
// 2. We join `repository_assignments` table with fallback behavior in case the repository has no
// assignments. A storage is considered assigned if:
//
// 1. If the repository has no assignments, every configured storage is considered assigned.
// 2. If the repository has assignments, the storage needs to be assigned explicitly.
// 3. Assignments of unconfigured storages are treated as if they don't exist.
//
// If none of the assigned storages are outdated, the repository is not considered outdated as
// the desired replication factor has been reached.
//
// 3. We join `repositories` table to filter out any repositories that have been deleted but still
// exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
// and there can't be any assignments for deleted repositories, this is still needed as long as the
// fallback behavior of no assignments is in place.
//
// 4. We join the `healthy_storages` view to return the storages current health.
//
// 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case
// of a failover.
//
// 6. Finally we aggregate each repository's information in to a single row with a JSON object containing
// the information. This allows us to group the output already in the query and makes scanning easier
// We filter out groups which do not have an assigned storage as the replication factor on those
// is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
// than the assigned ones.
//
var (
virtualStorages []string
storages []string
)
for virtualStorage, configuredStorages := range rs.storages {
for _, storage := range configuredStorages {
virtualStorages = append(virtualStorages, virtualStorage)
storages = append(storages, storage)
}
}
args := append([]interface{}{virtualStorages, storages}, filterArgs...)
rows, err := rs.db.QueryContext(ctx, fmt.Sprintf(`
WITH configured_storages AS (
SELECT unnest($1::text[]) AS virtual_storage,
unnest($2::text[]) AS storage
),
repositories AS (
SELECT *
FROM repositories
%s
),
storage_repositories AS (
SELECT repository_id, storage, storage_repositories.generation, verified_at
FROM repositories
JOIN storage_repositories USING (repository_id)
),
valid_primaries AS (
SELECT repository_id, storage
FROM valid_primaries
%s
)
SELECT
json_build_object (
'RepositoryID', repository_id,
'VirtualStorage', virtual_storage,
'RelativePath', relative_path,
'ReplicaPath', replica_path,
'Primary', "primary",
'Generation', repositories.generation,
'Replicas', json_agg(
json_build_object(
'Storage', storage,
'Generation', COALESCE(replicas.generation, -1),
'Assigned', assigned,
'Healthy', healthy_storages.storage IS NOT NULL,
'ValidPrimary', valid_primaries.storage IS NOT NULL,
'VerifiedAt', verified_at
)
)
)
FROM repositories
JOIN (
SELECT
repository_id,
storage,
generation,
repository_assignments.storage IS NOT NULL AS assigned,
verified_at
FROM storage_repositories
FULL JOIN (
SELECT repository_id, storage
FROM repositories
JOIN configured_storages USING (virtual_storage)
WHERE (
SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
FROM repository_assignments
WHERE repository_id = repositories.repository_id
AND (virtual_storage, storage) IN (SELECT * FROM configured_storages)
)
) AS repository_assignments USING (repository_id, storage)
ORDER BY repository_id, storage
) AS replicas USING (repository_id)
LEFT JOIN healthy_storages USING (virtual_storage, storage)
LEFT JOIN valid_primaries USING (repository_id, storage)
GROUP BY repository_id, virtual_storage, relative_path, replica_path, "primary", repositories.generation
%s
ORDER BY repository_id
`, repositoriesFilter, validPrimariesFilter, groupFilter), args...)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
var repos []RepositoryMetadata
for rows.Next() {
var repositoryJSON string
if err := rows.Scan(&repositoryJSON); err != nil {
return nil, fmt.Errorf("scan: %w", err)
}
var repo RepositoryMetadata
if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil {
return nil, fmt.Errorf("decode json: %w", err)
}
repos = append(repos, repo)
}
return repos, rows.Err()
}
// ReserveRepositoryID reserves an ID for a repository that is about to be created and returns it. If a repository already
// exists with the given virtual storage and relative path combination, an error is returned.
func (rs *PostgresRepositoryStore) ReserveRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
var id int64
if err := rs.db.QueryRowContext(ctx, `
SELECT nextval('repositories_repository_id_seq')
WHERE NOT EXISTS (
SELECT FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
)
`, virtualStorage, relativePath).Scan(&id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, ErrRepositoryAlreadyExists
}
return 0, fmt.Errorf("scan: %w", err)
}
return id, nil
}
// GetRepositoryID gets the ID of the repository identified via the given virtual storage and relative path. Returns a
// ErrRepositoryNotFound error if the repository doesn't exist.
func (rs *PostgresRepositoryStore) GetRepositoryID(ctx context.Context, virtualStorage, relativePath string) (int64, error) {
var id int64
if err := rs.db.QueryRowContext(ctx, `
SELECT repository_id
FROM repositories
WHERE virtual_storage = $1
AND relative_path = $2
`, virtualStorage, relativePath).Scan(&id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, ErrRepositoryNotFound
}
return 0, fmt.Errorf("scan: %w", err)
}
return id, nil
}
// GetReplicaPath gets the replica path of a repository. Returns a ErrRepositoryNotFound if a record
// for the repository ID is not found.
func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositoryID int64) (string, error) {
var replicaPath string
if err := rs.db.QueryRowContext(
ctx, "SELECT replica_path FROM repositories WHERE repository_id = $1", repositoryID,
).Scan(&replicaPath); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", ErrRepositoryNotFound
}
return "", fmt.Errorf("scan: %w", err)
}
return replicaPath, nil
}
// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) {
rows, err := rs.db.QueryContext(ctx, `
SELECT relative_path
FROM repositories
WHERE virtual_storage = $1
`, virtualStorage)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
var relativePaths []string
for rows.Next() {
var relativePath string
if err := rows.Scan(&relativePath); err != nil {
return nil, fmt.Errorf("scan: %w", err)
}
relativePaths = append(relativePaths, relativePath)
}
return relativePaths, rows.Err()
}