in src/dma/collector/workflows/readiness_check/base.py [0:0]
def execute_readiness_check(self) -> None:
"""Execute postgres assessments"""
if self.src_info.db_type == "POSTGRES":
# lazy loaded to help with circular import issues
from dma.collector.workflows.readiness_check._postgres.main import ( # noqa: PLC0415
PostgresReadinessCheckExecutor,
)
self.executor = PostgresReadinessCheckExecutor(
readiness_check=self,
console=self.console,
)
self.executor.execute()
elif self.src_info.db_type == "MYSQL":
# lazy loaded to help with circular import issues
from dma.collector.workflows.readiness_check._mysql.main import ( # noqa: PLC0415
MySQLReadinessCheckExecutor,
)
self.executor = MySQLReadinessCheckExecutor(
readiness_check=self,
console=self.console,
)
self.executor.execute()
else:
msg = f"{self.src_info.db_type} is not implemented."
raise ApplicationError(msg)