in internal/praefect/datastore/repository_store.go [718:861]
func (rs *PostgresRepositoryStore) getRepositoryMetadata(ctx context.Context, repositoriesFilter, validPrimariesFilter, groupFilter string, filterArgs ...interface{}) ([]RepositoryMetadata, error) {
// The query below gets the status of every repository which has one or more assigned replicas that
// are not able to serve requests at the moment. The status includes how many changes a replica is behind,
// whether the replica is assigned host or not, whether the replica is healthy and whether the replica is
// considered a valid primary candidate. It works as follows:
//
// 1. First we get all the storages which contain the repository from `storage_repositories`. We
// list every copy of the repository as the latest generation could exist on an unassigned
// storage.
//
// 2. We join `repository_assignments` table with fallback behavior in case the repository has no
// assignments. A storage is considered assigned if:
//
// 1. If the repository has no assignments, every configured storage is considered assigned.
// 2. If the repository has assignments, the storage needs to be assigned explicitly.
// 3. Assignments of unconfigured storages are treated as if they don't exist.
//
// If none of the assigned storages are outdated, the repository is not considered outdated as
// the desired replication factor has been reached.
//
// 3. We join `repositories` table to filter out any repositories that have been deleted but still
// exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
// and there can't be any assignments for deleted repositories, this is still needed as long as the
// fallback behavior of no assignments is in place.
//
// 4. We join the `healthy_storages` view to return the storages current health.
//
// 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case
// of a failover.
//
// 6. Finally we aggregate each repository's information in to a single row with a JSON object containing
// the information. This allows us to group the output already in the query and makes scanning easier
// We filter out groups which do not have an assigned storage as the replication factor on those
// is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
// than the assigned ones.
//
var (
virtualStorages []string
storages []string
)
for virtualStorage, configuredStorages := range rs.storages {
for _, storage := range configuredStorages {
virtualStorages = append(virtualStorages, virtualStorage)
storages = append(storages, storage)
}
}
args := append([]interface{}{virtualStorages, storages}, filterArgs...)
rows, err := rs.db.QueryContext(ctx, fmt.Sprintf(`
WITH configured_storages AS (
SELECT unnest($1::text[]) AS virtual_storage,
unnest($2::text[]) AS storage
),
repositories AS (
SELECT *
FROM repositories
%s
),
storage_repositories AS (
SELECT repository_id, storage, storage_repositories.generation, verified_at
FROM repositories
JOIN storage_repositories USING (repository_id)
),
valid_primaries AS (
SELECT repository_id, storage
FROM valid_primaries
%s
)
SELECT
json_build_object (
'RepositoryID', repository_id,
'VirtualStorage', virtual_storage,
'RelativePath', relative_path,
'ReplicaPath', replica_path,
'Primary', "primary",
'Generation', repositories.generation,
'Replicas', json_agg(
json_build_object(
'Storage', storage,
'Generation', COALESCE(replicas.generation, -1),
'Assigned', assigned,
'Healthy', healthy_storages.storage IS NOT NULL,
'ValidPrimary', valid_primaries.storage IS NOT NULL,
'VerifiedAt', verified_at
)
)
)
FROM repositories
JOIN (
SELECT
repository_id,
storage,
generation,
repository_assignments.storage IS NOT NULL AS assigned,
verified_at
FROM storage_repositories
FULL JOIN (
SELECT repository_id, storage
FROM repositories
JOIN configured_storages USING (virtual_storage)
WHERE (
SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
FROM repository_assignments
WHERE repository_id = repositories.repository_id
AND (virtual_storage, storage) IN (SELECT * FROM configured_storages)
)
) AS repository_assignments USING (repository_id, storage)
ORDER BY repository_id, storage
) AS replicas USING (repository_id)
LEFT JOIN healthy_storages USING (virtual_storage, storage)
LEFT JOIN valid_primaries USING (repository_id, storage)
GROUP BY repository_id, virtual_storage, relative_path, replica_path, "primary", repositories.generation
%s
ORDER BY repository_id
`, repositoriesFilter, validPrimariesFilter, groupFilter), args...)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
defer rows.Close()
var repos []RepositoryMetadata
for rows.Next() {
var repositoryJSON string
if err := rows.Scan(&repositoryJSON); err != nil {
return nil, fmt.Errorf("scan: %w", err)
}
var repo RepositoryMetadata
if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil {
return nil, fmt.Errorf("decode json: %w", err)
}
repos = append(repos, repo)
}
return repos, rows.Err()
}