dbt/include/maxcompute/macros/materializations/incremental/merge.sql (101 lines of code) (raw):

{% macro maxcompute__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} {%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set merge_update_columns = config.get('merge_update_columns') -%} {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none }} {% if unique_key %} {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} {% for key in unique_key %} {% set this_key_match %} DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} {% endset %} {% do predicates.append(this_key_match) %} {% endfor %} {% else %} {% set unique_key_match %} DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} {% endset %} {% do predicates.append(unique_key_match) %} {% endif %} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE on {{"(" ~ predicates | join(") and (") ~ ")"}} when matched then update set {% for column_name in update_columns -%} DBT_INTERNAL_DEST.{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} {%- if not loop.last %}, {%- endif %} {%- endfor %} when not matched then insert ({{ dest_cols_csv }}) values ( {% for column in dest_cols_names %} DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} {% endfor %}); {% else %} INSERT INTO {{ target }} ({{ dest_cols_csv }}) SELECT {{ dest_cols_csv }} FROM {{ source }} {% endif %} {% endmacro %} {% macro maxcompute__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {% if unique_key %} {% if unique_key is sequence and unique_key is not string %} delete from {{target }} using {{ source }} where ( {% for key in unique_key %} {{ source }}.{{ key }} = {{ target }}.{{ key }} {{ "and " if not loop.last}} {% endfor %} {% if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {% endif %} ); {% else %} delete from {{ target }} where ( {{ unique_key }}) in ( select ({{ unique_key }}) from {{ source }} ) {%- if incremental_predicates %} {% for predicate in incremental_predicates %} and {{ predicate }} {% endfor %} {%- endif -%}; {% endif %} {% endif %} insert into {{ target }} ({{ dest_cols_csv }}) ( select {{ dest_cols_csv }} from {{ source }} ) {%- endmacro %} {% macro maxcompute__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} {#-- The only time include_sql_header is True: --#} {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} {#-- We should consider including the sql header at the materialization level instead --#} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_names = get_quoted_list(dest_columns | map(attribute="name")) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none and include_sql_header }} {% call statement("main") %} {% if predicates %} DELETE FROM {{ target }} where True AND {{ predicates | join(' AND ') }}; {% else %} TRUNCATE TABLE {{ target }}; {% endif %} {% endcall %} INSERT INTO {{ target }} ({{ dest_cols_csv }}) SELECT {{ dest_cols_csv }} FROM {{ source }} {% endmacro %}