in src/dma/collector/workflows/readiness_check/_postgres/main.py [0:0]
def _check_version(self) -> None:
rule_code = "DATABASE_VERSION"
self.console.print(f"version: {self.db_version}")
detected_major_version = get_db_major_version(self.db_version)
detected_minor_version = get_db_minor_version(self.db_version)
is_rds = self._is_rds()
self.console.print(
f"version: {self.db_version}, major version: {detected_major_version}, minor version: {detected_minor_version}"
)
for c in self.rule_config:
if is_rds:
supported_minor_version = c.rds_minor_version_support_map.get(detected_major_version)
if supported_minor_version and detected_minor_version < supported_minor_version:
self.save_rule_result(
c.db_variant,
rule_code,
ERROR,
f"Source RDS database server has unsupported minor version: ({detected_minor_version})",
)
else:
self.save_rule_result(
c.db_variant,
rule_code,
PASS,
f"Version {self.db_version} is supported. Please ensure that you selected a version that meets or exceeds version {detected_major_version!s}.",
)
elif (
detected_major_version not in c.db_version_map
or detected_major_version < c.minimum_supported_major_version
or (c.maximum_supported_major_version and detected_major_version > c.maximum_supported_major_version)
):
self.save_rule_result(
c.db_variant,
rule_code,
ERROR,
f"Migration from source database server ({self.db_version}) is not supported",
)
else:
self.save_rule_result(
c.db_variant,
rule_code,
PASS,
f"Version {self.db_version} is supported. Please ensure that you selected a version that meets or exceeds version {detected_major_version!s}.",
)