dbt/include/maxcompute/macros/relations/table/create.sql (122 lines of code) (raw):
{% macro maxcompute__create_table_as(temporary, relation, sql) -%}
{%- set is_transactional = config.get('transactional') or config.get('delta') -%}
{%- set primary_keys = config.get('primary_keys') -%}
{%- set delta_table_bucket_num = config.get('delta_table_bucket_num', 16)-%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set lifecycle = config.get('lifecycle', none) -%}
{%- set tblproperties = config.get('tblproperties', none) -%}
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}
{{ create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys, delta_table_bucket_num, partition_config, lifecycle, tblproperties) }}
{%- endmacro %}
{% macro create_table_as_internal(temporary, relation, sql, is_transactional, primary_keys=none, delta_table_bucket_num=16, partition_config=none, lifecycle=none, tblproperties=none) -%}
{%- set sql_hints = config.get('sql_hints', none) -%}
{%- set sql_header = merge_sql_hints_and_header(sql_hints, config.get('sql_header', none)) -%}
{%- set is_delta = is_transactional and primary_keys is not none and primary_keys|length > 0 -%}
{% call statement('create_table', auto_begin=False) -%}
{{ sql_header if sql_header is not none }}
create table {{ relation.render() }} (
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced and (not temporary) %}
{{ get_assert_columns_equivalent(sql) }}
{{ get_table_columns_and_constraints_without_brackets(partition_config) }}
{%- set sql = get_select_subquery(sql) %}
{%- else -%}
{{ get_table_columns(sql, primary_keys, partition_config, sql_header) }}
{%- endif -%}
{% if is_delta -%}
,primary key(
{%- for pk in primary_keys -%}
{{ pk }}{{ "," if not loop.last }}
{%- endfor -%})
{%- endif -%}
)
{% if partition_config -%}
{{ partition_by(partition_config) }}
{%- endif -%}
{% set extra_props = {} %}
{% if tblproperties %}
{% do extra_props.update(tblproperties) %}
{% endif %}
{% if is_transactional %}
{% do extra_props.update({"transactional": "true"}) %}
{% if is_delta %}
{% do extra_props.update({"write.bucket.num": delta_table_bucket_num}) %}
{% endif %}
{% endif %}
{% if extra_props %}
tblproperties(
{% for key, value in extra_props.items() %}
"{{ key }}"="{{ value }}"{{ "," if not loop.last }}
{% endfor %}
)
{% endif %}
{%- if lifecycle %}
LIFECYCLE {{ lifecycle }}
{%- elif temporary %}
LIFECYCLE 1
{%- endif %}
;
{%- endcall -%}
{{ sql_header if sql_header is not none }}
insert into {{ relation.render() }}
{% if partition_config and partition_config.fields|length > 0 and not partition_config.auto_partition() -%}
partition({{ partition_config.render(False) }})
{%- endif -%}
(
{% for c in get_column_schema_from_query(sql, sql_header) -%}
`{{ c.name }}`{{ "," if not loop.last }}
{% endfor %}
)(
{{ sql }}
);
{%- endmacro %}
{% macro get_table_columns(sql, primary_keys=none, partition_config=None, sql_header=None) -%}
{% set model_columns = model.columns %}
{% set partition_by_cols = [] if (partition_config is none or partition_config.auto_partition()) else partition_config.fields %}
{% set ns = namespace(needs_comma=false) %} {# 初始化命名空间变量 #}
{% for c in get_column_schema_from_query(sql, sql_header) -%}
{% if c.name not in partition_by_cols -%}
{{- "," if ns.needs_comma -}} {# 根据命名空间变量判断逗号 #}
{{ c.name }} {{ c.dtype }}
{% if primary_keys and c.name in primary_keys %}not null{% endif %}
{% if model_columns and c.name in model_columns %} {# 从模型配置中读取约束 #}
{% for constraint in model_columns[c.name].constraints %}
{% if constraint.type == 'not_null' %}
{% if not primary_keys or c.name not in primary_keys %}
not null {# 避免重复增加 not null #}
{% endif %}
{% endif %}
{% endfor %}
{{ "COMMENT" }} {{ quote_and_escape(model_columns[c.name].description) }}
{%- endif %}
{% set ns.needs_comma = true %} {# 标记后续列需要逗号 #}
{%- endif %}
{% endfor %}
{%- endmacro %}
{% macro quote_and_escape(input_string) %}
{% set escaped_string = input_string | replace("'", "\\'") %}
'{{ escaped_string }}'
{% endmacro %}
-- Compared to get_table_columns_and_constraints, only the surrounding brackets are deleted
{% macro get_table_columns_and_constraints_without_brackets(partition_config=None) -%}
{# loop through user_provided_columns to create DDL with data types and constraints #}
{%- set raw_column_constraints = adapter.mc_render_raw_columns_constraints(raw_columns=model['columns'], partition_config=partition_config) -%}
{%- set raw_model_constraints = adapter.render_raw_model_constraints(raw_constraints=model['constraints']) -%}
{% for c in raw_column_constraints -%}
{{ c }}{{ "," if not loop.last or raw_model_constraints }}
{% endfor %}
{% for c in raw_model_constraints -%}
{{ c }}{{ "," if not loop.last }}
{% endfor -%}
{%- endmacro %}
{% macro merge_sql_hints_and_header(sql_hints=None, sql_header=None) -%}
{%- set parts = [] -%}
{%- if sql_hints -%}
{%- for key, value in sql_hints.items() -%}
{%- do parts.append('set ' ~ key ~ '=' ~ value ~ ';') -%}
{%- endfor -%}
{%- endif -%}
{%- if sql_header -%}
{%- do parts.append(sql_header) -%}
{%- endif -%}
{{- parts | join('\n') | trim -}}
{%- endmacro -%}