func()

in internal/praefect/datastore/repository_store.go [718:861]


func (rs *PostgresRepositoryStore) getRepositoryMetadata(ctx context.Context, repositoriesFilter, validPrimariesFilter, groupFilter string, filterArgs ...interface{}) ([]RepositoryMetadata, error) {
	// The query below gets the status of every repository which has one or more assigned replicas that
	// are not able to serve requests at the moment. The status includes how many changes a replica is behind,
	// whether the replica is assigned host or not, whether the replica is healthy and whether the replica is
	// considered a valid primary candidate. It works as follows:
	//
	// 1. First we get all the storages which contain the repository from `storage_repositories`. We
	//    list every copy of the repository as the latest generation could exist on an unassigned
	//    storage.
	//
	// 2. We join `repository_assignments` table with fallback behavior in case the repository has no
	//    assignments. A storage is considered assigned if:
	//
	//    1. If the repository has no assignments, every configured storage is considered assigned.
	//    2. If the repository has assignments, the storage needs to be assigned explicitly.
	//    3. Assignments of unconfigured storages are treated as if they don't exist.
	//
	//    If none of the assigned storages are outdated, the repository is not considered outdated as
	//    the desired replication factor has been reached.
	//
	// 3. We join `repositories` table to filter out any repositories that have been deleted but still
	//    exist on some storages. While the `repository_assignments` has a foreign key on `repositories`
	//    and there can't be any assignments for deleted repositories, this is still needed as long as the
	//    fallback behavior of no assignments is in place.
	//
	// 4. We join the `healthy_storages` view to return the storages current health.
	//
	// 5. We join the `valid_primaries` view to return whether the storage is ready to act as a primary in case
	//    of a failover.
	//
	// 6. Finally we aggregate each repository's information in to a single row with a JSON object containing
	//    the information. This allows us to group the output already in the query and makes scanning easier
	//    We filter out groups which do not have an assigned storage as the replication factor on those
	//    is reached. Status of unassigned storages does not matter as long as they don't contain a later generation
	//    than the assigned ones.
	//

	var (
		virtualStorages []string
		storages        []string
	)

	for virtualStorage, configuredStorages := range rs.storages {
		for _, storage := range configuredStorages {
			virtualStorages = append(virtualStorages, virtualStorage)
			storages = append(storages, storage)
		}
	}

	args := append([]interface{}{virtualStorages, storages}, filterArgs...)

	rows, err := rs.db.QueryContext(ctx, fmt.Sprintf(`
WITH configured_storages AS (
	SELECT unnest($1::text[]) AS virtual_storage,
	       unnest($2::text[]) AS storage
),

repositories AS (
	SELECT *
	FROM repositories
	%s
),

storage_repositories AS (
	SELECT repository_id, storage, storage_repositories.generation, verified_at
	FROM repositories
	JOIN storage_repositories USING (repository_id)
),

valid_primaries AS (
	SELECT repository_id, storage
	FROM valid_primaries
	%s
)

SELECT
	json_build_object (
		'RepositoryID', repository_id,
		'VirtualStorage', virtual_storage,
		'RelativePath', relative_path,
		'ReplicaPath', replica_path,
		'Primary', "primary",
		'Generation', repositories.generation,
		'Replicas', json_agg(
			json_build_object(
				'Storage', storage,
				'Generation', COALESCE(replicas.generation, -1),
				'Assigned', assigned,
				'Healthy', healthy_storages.storage IS NOT NULL,
				'ValidPrimary', valid_primaries.storage IS NOT NULL,
				'VerifiedAt', verified_at
			)
		)
	)
FROM repositories
JOIN (
	SELECT
		repository_id,
		storage,
		generation,
		repository_assignments.storage IS NOT NULL AS assigned,
		verified_at
	FROM storage_repositories
	FULL JOIN (
		SELECT repository_id, storage
		FROM repositories
		JOIN configured_storages USING (virtual_storage)
		WHERE (
			SELECT COUNT(*) = 0 OR COUNT(*) FILTER (WHERE storage = configured_storages.storage) = 1
			FROM repository_assignments
			WHERE repository_id = repositories.repository_id
			AND   (virtual_storage, storage) IN (SELECT * FROM configured_storages)
		)
	) AS repository_assignments USING (repository_id, storage)
	ORDER BY repository_id, storage
) AS replicas USING (repository_id)
LEFT JOIN healthy_storages USING (virtual_storage, storage)
LEFT JOIN valid_primaries USING (repository_id, storage)
GROUP BY repository_id, virtual_storage, relative_path, replica_path, "primary", repositories.generation
%s
ORDER BY repository_id
	`, repositoriesFilter, validPrimariesFilter, groupFilter), args...)
	if err != nil {
		return nil, fmt.Errorf("query: %w", err)
	}
	defer rows.Close()

	var repos []RepositoryMetadata
	for rows.Next() {
		var repositoryJSON string
		if err := rows.Scan(&repositoryJSON); err != nil {
			return nil, fmt.Errorf("scan: %w", err)
		}

		var repo RepositoryMetadata
		if err := json.NewDecoder(strings.NewReader(repositoryJSON)).Decode(&repo); err != nil {
			return nil, fmt.Errorf("decode json: %w", err)
		}

		repos = append(repos, repo)
	}

	return repos, rows.Err()
}