def _check_replication_role()

in src/dma/collector/workflows/readiness_check/_postgres/main.py [0:0]


    def _check_replication_role(self) -> None:
        if self._is_rds():
            return
        rule_code = REPLICATION_ROLE
        result = self.local_db.sql("SELECT rolreplication FROM collection_postgres_replication_role").fetchone()
        if result is None:
            return
        for c in self.rule_config:
            if result[0] == "false":
                self.save_rule_result(
                    c.db_variant,
                    rule_code,
                    ACTION_REQUIRED,
                    "user does not have rolreplication role.",
                )
            else:
                self.save_rule_result(
                    c.db_variant,
                    rule_code,
                    PASS,
                    "user has rolreplication role.",
                )