dbt/include/maxcompute/macros/materializations/incremental/merge.sql (101 lines of code) (raw):
{% macro maxcompute__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none }}
{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}
when matched then update set
{% for column_name in update_columns -%}
DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
when not matched then insert
({{ dest_cols_csv }})
values (
{% for column in dest_cols_names %}
DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}}
{% endfor %});
{% else %}
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endif %}
{% endmacro %}
{% macro maxcompute__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{target }}
using {{ source }}
where (
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last}}
{% endfor %}
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
);
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};
{% endif %}
{% endif %}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{%- endmacro %}
{% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
{#-- We should consider including the sql header at the materialization level instead --#}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{{ sql_header if sql_header is not none and include_sql_header }}
{% call statement("main") %}
{% if predicates %}
DELETE FROM {{ target }} where True
AND {{ predicates | join(' AND ') }};
{% else %}
TRUNCATE TABLE {{ target }};
{% endif %}
{% endcall %}
INSERT INTO {{ target }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ source }}
{% endmacro %}