func()

in internal/praefect/reconciler/reconciler.go [117:342]


func (r *Reconciler) reconcile(ctx context.Context) error {
	defer prometheus.NewTimer(r.reconciliationSchedulingDuration).ObserveDuration()

	var virtualStorages []string
	var storages []string

	for virtualStorage, healthyStorages := range r.hc.HealthyNodes() {
		if len(healthyStorages) < 2 {
			// minimum two healthy storages within a virtual storage needed for valid
			// replication source and target
			r.log.WithField("virtual_storage", virtualStorage).Info("reconciliation skipped for virtual storage due to not having enough healthy storages")
			continue
		}

		for _, storage := range healthyStorages {
			virtualStorages = append(virtualStorages, virtualStorage)
			storages = append(storages, storage)
		}
	}

	if len(virtualStorages) == 0 {
		return nil
	}

	rows, err := r.db.QueryContext(ctx, `
WITH reconciliation_lock AS (
	SELECT pg_try_advisory_xact_lock($1) AS acquired
),

healthy_storages AS (
    SELECT unnest($2::text[]) AS virtual_storage,
           unnest($3::text[]) AS storage
),

delete_jobs AS (
	SELECT DISTINCT ON (repository_id)
		repository_id,
		virtual_storage,
		relative_path,
		storage
	FROM (
		SELECT repository_id, storage, generation
		FROM storage_repositories
	) AS storage_repositories
	JOIN repositories USING (repository_id)
	JOIN healthy_storages USING (virtual_storage, storage)
	WHERE (
		-- Only unassigned repositories should be targeted for deletion. If no assignment exists,
		-- every storage is considered assigned, thus no deletion would be scheduled.
		SELECT COUNT(storage) > 0 AND COUNT(storage) FILTER (WHERE storage = storage_repositories.storage) = 0
		FROM repository_assignments
		WHERE repository_id = repositories.repository_id
	)
	AND storage_repositories.generation <= (
		-- Check whether the replica's generation is equal or lower than the generation of every assigned
		-- replica of the repository. If so, then it is eligible for deletion.
		SELECT MIN(COALESCE(generation, -1))
		FROM repository_assignments
		FULL JOIN storage_repositories USING (repository_id, storage)
		WHERE repository_id = repositories.repository_id
	) AND NOT EXISTS (
		-- Ensure the replica is not used as target or source in any scheduled job. This is to avoid breaking
		-- any already scheduled jobs.
		SELECT FROM replication_queue
		WHERE (job->'repository_id')::bigint = repository_id
		AND (
				job->>'source_node_storage' = storage
			OR 	job->>'target_node_storage' = storage
		)
		AND state NOT IN ('completed', 'dead')
	) AND NOT EXISTS (
		-- Ensure there are no other scheduled 'delete_replica' type jobs for the repository. Performing rapid
		-- repository_assignments could cause the reconciler to schedule deletion against all replicas. To avoid this,
		-- we do not allow more than one 'delete_replica' job to be active at any given time.
		SELECT FROM replication_queue
		WHERE state NOT IN ('completed', 'dead')
		AND (job->>'repository_id')::bigint = repository_id
		AND job->>'change' = 'delete_replica'
	)
),

update_jobs AS (
	SELECT DISTINCT ON (repository_id, target_node_storage)
		repository_id,
		virtual_storage,
		relative_path,
		source_node_storage,
		target_node_storage
	FROM (
		SELECT repository_id, virtual_storage, relative_path, storage AS target_node_storage
		FROM repositories
		JOIN healthy_storages USING (virtual_storage)
		LEFT JOIN (
			SELECT repository_id, storage, generation
			FROM storage_repositories
		) AS storage_repositories USING (repository_id, storage)
		WHERE COALESCE(storage_repositories.generation != repositories.generation, true)
		AND (
			-- If assignments exist for the repository, only the assigned storages are targeted for replication.
			-- If no assignments exist, every healthy node is targeted for replication.
			SELECT COUNT(storage) = 0 OR COUNT(storage) FILTER (WHERE storage = healthy_storages.storage) = 1
			FROM repository_assignments
			WHERE repository_id = repositories.repository_id
		)
		ORDER BY virtual_storage, relative_path
	) AS unhealthy_repositories
	JOIN (
		SELECT repository_id, storage AS source_node_storage
		FROM (
			SELECT repository_id, relative_path, storage, generation
			FROM storage_repositories
		) AS storage_repositories
		JOIN repositories USING (repository_id, relative_path, generation)
		JOIN healthy_storages USING (virtual_storage, storage)
		WHERE NOT EXISTS (
			SELECT FROM replication_queue
			WHERE state NOT IN ('completed', 'dead')
			AND (job->>'repository_id')::bigint = repository_id
			AND job->>'target_node_storage' = storage
			AND job->>'change' = 'delete_replica'
		)
		ORDER BY virtual_storage, relative_path
	) AS healthy_repositories USING (repository_id)
	WHERE NOT EXISTS (
		SELECT FROM replication_queue
		WHERE state NOT IN ('completed', 'dead')
		AND (job->>'repository_id')::bigint = repository_id
		AND job->>'target_node_storage' = target_node_storage
		AND job->>'change' = 'update'
	)
	ORDER BY repository_id, target_node_storage, random()
),

reconciliation_jobs AS (
	INSERT INTO replication_queue (lock_id, job, meta)
	SELECT
		(virtual_storage || '|' || target_node_storage || '|' || relative_path),
		to_jsonb(reconciliation_jobs),
		jsonb_build_object('correlation_id', encode(random()::text::bytea, 'base64'))
	FROM (
		SELECT
			COALESCE(repository_id, 0) AS repository_id,
			virtual_storage,
			relative_path,
			source_node_storage,
			target_node_storage,
			'update' AS change
		FROM update_jobs
			UNION ALL
		SELECT
			COALESCE(repository_id, 0) AS repository_id,
			virtual_storage,
			relative_path,
			NULL AS source_node_storage,
			storage AS target_node_storage,
			'delete_replica' AS change
		FROM delete_jobs
	) AS reconciliation_jobs
	-- only perform inserts if we managed to acquire the lock as otherwise
	-- we'd schedule duplicate jobs
	WHERE ( SELECT acquired FROM reconciliation_lock )
	RETURNING lock_id, meta, job
),

create_locks AS (
	INSERT INTO replication_queue_lock(id)
	SELECT lock_id
	FROM reconciliation_jobs
	ON CONFLICT (id) DO NOTHING
)

SELECT
	meta->>'correlation_id',
	job->>'repository_id',
	job->>'change',
	job->>'virtual_storage',
	job->>'relative_path',
	job->>'source_node_storage',
	job->>'target_node_storage'
FROM reconciliation_jobs
`, advisorylock.Reconcile, virtualStorages, storages)
	if err != nil {
		return fmt.Errorf("query: %w", err)
	}

	defer func() {
		if err := rows.Close(); err != nil {
			r.log.WithError(err).Error("error closing rows")
		}
	}()

	jobs := make([]job, 0, logBatchSize)

	for rows.Next() {
		var j job
		if err := rows.Scan(
			&j.CorrelationID,
			&j.RepositoryID,
			&j.Change,
			&j.VirtualStorage,
			&j.RelativePath,
			&j.SourceStorage,
			&j.TargetStorage,
		); err != nil {
			return fmt.Errorf("scan: %w", err)
		}

		jobs = append(jobs, j)
		if len(jobs) == logBatchSize {
			r.logJobs(jobs)
			jobs = jobs[:0]
		}
	}

	if err = rows.Err(); err != nil {
		return fmt.Errorf("rows.Err: %w", err)
	}

	if len(jobs) > 0 {
		r.logJobs(jobs)
	} else {
		r.log.Debug("reconciliation did not result in any scheduled jobs")
	}

	return nil
}