in src/dma/collector/workflows/readiness_check/_postgres/main.py [0:0]
def _check_replication_role(self) -> None:
if self._is_rds():
return
rule_code = REPLICATION_ROLE
result = self.local_db.sql("SELECT rolreplication FROM collection_postgres_replication_role").fetchone()
if result is None:
return
for c in self.rule_config:
if result[0] == "false":
self.save_rule_result(
c.db_variant,
rule_code,
ACTION_REQUIRED,
"user does not have rolreplication role.",
)
else:
self.save_rule_result(
c.db_variant,
rule_code,
PASS,
"user has rolreplication role.",
)