def _check_max_replication_slots()

in src/dma/collector/workflows/readiness_check/_postgres/main.py [0:0]


    def _check_max_replication_slots(self) -> None:
        rule_code = "MAX_REPLICATION_SLOTS"
        url_link = "Refer to https://cloud.google.com/database-migration/docs/postgres/create-migration-job#specify-source-connection-profile-info for more info."
        db_count_result = self.local_db.sql(
            "select count(*) from extended_collection_postgres_all_databases"
        ).fetchone()
        db_count = int(db_count_result[0]) if db_count_result is not None else 0
        total_replication_slots_result = self.local_db.sql(
            "select c.setting_value as max_replication_slots from collection_postgres_settings c where c.setting_name='max_replication_slots';"
        ).fetchone()
        total_replication_slots = (
            int(total_replication_slots_result[0]) if total_replication_slots_result is not None else 0
        )
        used_replication_slots_result = self.local_db.sql(
            "select count(*) from collection_postgres_replication_slots"
        ).fetchone()
        used_replication_slots = (
            int(used_replication_slots_result[0]) if used_replication_slots_result is not None else 0
        )
        required_replication_slots = db_count + used_replication_slots
        for c in self.rule_config:
            max_required_replication_slots = required_replication_slots + c.extra_replication_subscriptions_required
            if total_replication_slots < required_replication_slots:
                self.save_rule_result(
                    c.db_variant,
                    rule_code,
                    ACTION_REQUIRED,
                    f"Insufficient `max_replication_slots`: {total_replication_slots}, should be set to at least {required_replication_slots}. Up to {c.extra_replication_subscriptions_required} additional subscriptions might be required depending on the parallelism level set for migration. {url_link}",
                )
            elif total_replication_slots < max_required_replication_slots:
                self.save_rule_result(
                    c.db_variant,
                    rule_code,
                    WARNING,
                    f"`max_replication_slots` current value: {total_replication_slots}, this might need to be increased to {max_required_replication_slots} depending on the parallelism level set for migration. {url_link}",
                )
            else:
                self.save_rule_result(
                    c.db_variant,
                    rule_code,
                    PASS,
                    f"`max_replication_slots` current value: {total_replication_slots}, this meets or exceeds the maximum required value of {max_required_replication_slots}",
                )