dbt/adapters/maxcompute/relation_configs/_materialized_view.py (98 lines of code) (raw):

from dataclasses import dataclass from typing import Any, Dict, Optional, List from dbt.adapters.contracts.relation import ( RelationConfig, ComponentName, ) from dbt.adapters.maxcompute.relation_configs._base import MaxComputeBaseRelationConfig from dbt.adapters.maxcompute.relation_configs._partition import ( PartitionConfig, ) from dbt.adapters.maxcompute.utils import quote_string, quote_ref @dataclass(frozen=True, eq=True, unsafe_hash=True) class MaxComputeMaterializedViewConfig(MaxComputeBaseRelationConfig): name: str project: str schema: str lifecycle: Optional[int] = None build_deferred: bool = False columns: Optional[List[str]] = None column_comment: Optional[Dict[str, str]] = None disable_rewrite: bool = False table_comment: Optional[str] = None partition_by: Optional[PartitionConfig] = None tblProperties: Optional[Dict[str, str]] = None @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> "MaxComputeMaterializedViewConfig": # required kwargs_dict: Dict[str, Any] = { "name": cls._render_part(ComponentName.Identifier, config_dict["name"]), "schema": cls._render_part(ComponentName.Schema, config_dict["schema"]), "project": cls._render_part(ComponentName.Database, config_dict["project"]), } for key, value in config_dict.items(): if key in ["name", "schema", "project"]: pass kwargs_dict[key] = value if partition := config_dict.get("partition_by"): kwargs_dict.update({"partition_by": PartitionConfig.parse(partition)}) materialized_view: "MaxComputeMaterializedViewConfig" = super().from_dict(kwargs_dict) return materialized_view @classmethod def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]: config_dict = { "name": relation_config.identifier, "schema": relation_config.schema, "project": relation_config.database, } items = [ "lifecycle", "build_deferred", "columns", "column_comment", "disable_rewrite", "table_comment", "partition_by", "tblProperties", ] if relation_config: for item in items: if item in relation_config.config: config_dict.update({item: relation_config.config[item]}) return config_dict def get_coordinate(self) -> str: if self.schema is None: return f"{self.name}" if self.project is None: return f"{self.schema}.{self.name}" return f"{self.project}.{self.schema}.{self.name}" def create_table_sql(self) -> str: sql = f"CREATE MATERIALIZED VIEW IF NOT EXISTS {self.get_coordinate()}\n" if self.lifecycle and self.lifecycle > 0: sql += f"LIFECYCLE {self.lifecycle}\n" if self.build_deferred: sql += "BUILD DEFERRED\n" if self.columns and len(self.columns) > 0: sql += "(" for column in self.columns: if self.column_comment and column in self.column_comment: sql += ( f"{quote_ref(column)} COMMENT {quote_string(self.column_comment[column])}" ) else: sql += f"{quote_ref(column)}" sql += ", " sql = sql[:-2] sql += ")\n" if self.disable_rewrite: sql += " DISABLE REWRITE\n" if self.table_comment: sql += f"COMMENT {quote_string(self.table_comment)}\n" if self.partition_by and len(self.partition_by.fields) > 0: sql += f"PARTITIONED BY({self.partition_by.render(False)})\n" if self.tblProperties and len(self.tblProperties) > 0: sql += "TBLPROPERTIES( " for k, v in self.tblProperties.items(): sql += f'"{k}"="{v}", ' sql = sql[:-2] sql += ")\n" return sql