dbt/adapters/maxcompute/relation_configs/_partition.py (69 lines of code) (raw):

from dataclasses import dataclass, field from typing import Optional, List, Dict, Any import dbt_common.exceptions from dbt.adapters.contracts.relation import RelationConfig from dbt_common.dataclass_schema import dbtClassMixin @dataclass class PartitionConfig(dbtClassMixin): granularity: str = "day" copy_partitions: bool = False generate_column_name: Optional[str] = None fields: List[str] = field(default_factory=list) data_types: List[str] = field(default_factory=list) def auto_partition(self) -> bool: for t in self.data_types: if t.lower() in ["timestamp", "date", "datetime", "timestamp_ntz"]: return True return False def render(self, with_type: bool = True) -> str: default_value = len(self.data_types) == 0 res = "" for i, field_name in enumerate(self.fields): if with_type: if default_value: column = f"{field_name} string" else: column = f"{field_name} {self.data_types[i]}" else: column = field_name res += f"{column}, " res = res[:-2] # 去掉最后的逗号和空格 return res @classmethod def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]: if raw_partition_by is None: return None try: new_dict = {} for key, value in raw_partition_by.items(): if key in ["fields", "data_types"]: new_dict[key] = [item.strip() for item in value.split(",")] else: new_dict[key] = value res = cls.from_dict(new_dict) res.post_validate() return res except TypeError: raise dbt_common.exceptions.CompilationError( f"Invalid partition_by config:\n" f" Got: {raw_partition_by}\n" f' Expected a dictionary with "fields" and "data_types" keys' ) @classmethod def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]: """ Parse model node into a raw config for `PartitionConfig.parse` - Note: This doesn't currently collect `time_ingestion_partitioning` and `copy_partitions` because this was built for materialized views, which do not support those settings. """ config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by") return config_dict def post_validate(self): if 0 < len(self.data_types) != len(self.fields): raise dbt_common.exceptions.DbtValidationError( f"Invalid partition_by config:\n" f" Got: {self.fields}\n" f" Got: {self.data_types}\n" f" Expected the same number of fields and data types" ) if self.auto_partition() and len(self.fields) > 1: raise dbt_common.exceptions.DbtValidationError( f"Invalid partition_by config:\n" f" Got: {self.fields}\n" f" Expected a single partition column for auto partitioning" )