in internal/praefect/reconciler/reconciler.go [117:342]
func (r *Reconciler) reconcile(ctx context.Context) error {
defer prometheus.NewTimer(r.reconciliationSchedulingDuration).ObserveDuration()
var virtualStorages []string
var storages []string
for virtualStorage, healthyStorages := range r.hc.HealthyNodes() {
if len(healthyStorages) < 2 {
// minimum two healthy storages within a virtual storage needed for valid
// replication source and target
r.log.WithField("virtual_storage", virtualStorage).Info("reconciliation skipped for virtual storage due to not having enough healthy storages")
continue
}
for _, storage := range healthyStorages {
virtualStorages = append(virtualStorages, virtualStorage)
storages = append(storages, storage)
}
}
if len(virtualStorages) == 0 {
return nil
}
rows, err := r.db.QueryContext(ctx, `
WITH reconciliation_lock AS (
SELECT pg_try_advisory_xact_lock($1) AS acquired
),
healthy_storages AS (
SELECT unnest($2::text[]) AS virtual_storage,
unnest($3::text[]) AS storage
),
delete_jobs AS (
SELECT DISTINCT ON (repository_id)
repository_id,
virtual_storage,
relative_path,
storage
FROM (
SELECT repository_id, storage, generation
FROM storage_repositories
) AS storage_repositories
JOIN repositories USING (repository_id)
JOIN healthy_storages USING (virtual_storage, storage)
WHERE (
-- Only unassigned repositories should be targeted for deletion. If no assignment exists,
-- every storage is considered assigned, thus no deletion would be scheduled.
SELECT COUNT(storage) > 0 AND COUNT(storage) FILTER (WHERE storage = storage_repositories.storage) = 0
FROM repository_assignments
WHERE repository_id = repositories.repository_id
)
AND storage_repositories.generation <= (
-- Check whether the replica's generation is equal or lower than the generation of every assigned
-- replica of the repository. If so, then it is eligible for deletion.
SELECT MIN(COALESCE(generation, -1))
FROM repository_assignments
FULL JOIN storage_repositories USING (repository_id, storage)
WHERE repository_id = repositories.repository_id
) AND NOT EXISTS (
-- Ensure the replica is not used as target or source in any scheduled job. This is to avoid breaking
-- any already scheduled jobs.
SELECT FROM replication_queue
WHERE (job->'repository_id')::bigint = repository_id
AND (
job->>'source_node_storage' = storage
OR job->>'target_node_storage' = storage
)
AND state NOT IN ('completed', 'dead')
) AND NOT EXISTS (
-- Ensure there are no other scheduled 'delete_replica' type jobs for the repository. Performing rapid
-- repository_assignments could cause the reconciler to schedule deletion against all replicas. To avoid this,
-- we do not allow more than one 'delete_replica' job to be active at any given time.
SELECT FROM replication_queue
WHERE state NOT IN ('completed', 'dead')
AND (job->>'repository_id')::bigint = repository_id
AND job->>'change' = 'delete_replica'
)
),
update_jobs AS (
SELECT DISTINCT ON (repository_id, target_node_storage)
repository_id,
virtual_storage,
relative_path,
source_node_storage,
target_node_storage
FROM (
SELECT repository_id, virtual_storage, relative_path, storage AS target_node_storage
FROM repositories
JOIN healthy_storages USING (virtual_storage)
LEFT JOIN (
SELECT repository_id, storage, generation
FROM storage_repositories
) AS storage_repositories USING (repository_id, storage)
WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
AND (
-- If assignments exist for the repository, only the assigned storages are targeted for replication.
-- If no assignments exist, every healthy node is targeted for replication.
SELECT COUNT(storage) = 0 OR COUNT(storage) FILTER (WHERE storage = healthy_storages.storage) = 1
FROM repository_assignments
WHERE repository_id = repositories.repository_id
)
ORDER BY virtual_storage, relative_path
) AS unhealthy_repositories
JOIN (
SELECT repository_id, storage AS source_node_storage
FROM (
SELECT repository_id, relative_path, storage, generation
FROM storage_repositories
) AS storage_repositories
JOIN repositories USING (repository_id, relative_path, generation)
JOIN healthy_storages USING (virtual_storage, storage)
WHERE NOT EXISTS (
SELECT FROM replication_queue
WHERE state NOT IN ('completed', 'dead')
AND (job->>'repository_id')::bigint = repository_id
AND job->>'target_node_storage' = storage
AND job->>'change' = 'delete_replica'
)
ORDER BY virtual_storage, relative_path
) AS healthy_repositories USING (repository_id)
WHERE NOT EXISTS (
SELECT FROM replication_queue
WHERE state NOT IN ('completed', 'dead')
AND (job->>'repository_id')::bigint = repository_id
AND job->>'target_node_storage' = target_node_storage
AND job->>'change' = 'update'
)
ORDER BY repository_id, target_node_storage, random()
),
reconciliation_jobs AS (
INSERT INTO replication_queue (lock_id, job, meta)
SELECT
(virtual_storage || '|' || target_node_storage || '|' || relative_path),
to_jsonb(reconciliation_jobs),
jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64'))
FROM (
SELECT
COALESCE(repository_id, 0) AS repository_id,
virtual_storage,
relative_path,
source_node_storage,
target_node_storage,
'update' AS change
FROM update_jobs
UNION ALL
SELECT
COALESCE(repository_id, 0) AS repository_id,
virtual_storage,
relative_path,
NULL AS source_node_storage,
storage AS target_node_storage,
'delete_replica' AS change
FROM delete_jobs
) AS reconciliation_jobs
-- only perform inserts if we managed to acquire the lock as otherwise
-- we'd schedule duplicate jobs
WHERE ( SELECT acquired FROM reconciliation_lock )
RETURNING lock_id, meta, job
),
create_locks AS (
INSERT INTO replication_queue_lock(id)
SELECT lock_id
FROM reconciliation_jobs
ON CONFLICT (id) DO NOTHING
)
SELECT
meta->>'correlation_id',
job->>'repository_id',
job->>'change',
job->>'virtual_storage',
job->>'relative_path',
job->>'source_node_storage',
job->>'target_node_storage'
FROM reconciliation_jobs
`, advisorylock.Reconcile, virtualStorages, storages)
if err != nil {
return fmt.Errorf("query: %w", err)
}
defer func() {
if err := rows.Close(); err != nil {
r.log.WithError(err).Error("error closing rows")
}
}()
jobs := make([]job, 0, logBatchSize)
for rows.Next() {
var j job
if err := rows.Scan(
&j.CorrelationID,
&j.RepositoryID,
&j.Change,
&j.VirtualStorage,
&j.RelativePath,
&j.SourceStorage,
&j.TargetStorage,
); err != nil {
return fmt.Errorf("scan: %w", err)
}
jobs = append(jobs, j)
if len(jobs) == logBatchSize {
r.logJobs(jobs)
jobs = jobs[:0]
}
}
if err = rows.Err(); err != nil {
return fmt.Errorf("rows.Err: %w", err)
}
if len(jobs) > 0 {
r.logJobs(jobs)
} else {
r.log.Debug("reconciliation did not result in any scheduled jobs")
}
return nil
}