in awswrangler/_utils.py [0:0]
def check_schema_changes(columns_types: dict[str, str], table_input: dict[str, Any] | None, mode: str) -> None:
"""Check schema changes."""
if (table_input is not None) and (mode in ("append", "overwrite_partitions")):
catalog_cols: dict[str, str] = {x["Name"]: x["Type"] for x in table_input["StorageDescriptor"]["Columns"]}
for c, t in columns_types.items():
if c not in catalog_cols:
raise exceptions.InvalidArgumentValue(
f"Schema change detected: New column {c} with type {t}. "
"Please pass schema_evolution=True to allow new columns "
"behaviour."
)
if t != catalog_cols[c]: # Data type change detected!
raise exceptions.InvalidArgumentValue(
f"Schema change detected: Data type change on column {c} "
f"(Old type: {catalog_cols[c]} / New type {t})."
)