in src/dma/collector/query_managers/base.py [0:0]
def execute_ddl_scripts(self, *args: Any, **kwargs: Any) -> None:
"""Execute pre-processing queries."""
console.print(Padding("CANONICAL DATA MODEL", 1, style="bold", expand=True), width=80)
with console.status("[bold green]Creating tables...[/]") as status:
for script in self.available_queries("ddl"):
status.update(rf" [yellow]*[/] Executing [bold magenta]`{script}`[/]")
self.execute(script)
status.console.print(rf" [green]:heavy_check_mark:[/] Created [bold magenta]`{script}`[/]")
if not self.available_queries("ddl"):
console.print(" [dim grey]:heavy_check_mark: No DDL scripts to load[/]")