dbt/include/maxcompute/macros/materializations/snapshots/snapshot.sql (124 lines of code) (raw):

-- only change varchar to string, dbt-adapters/dbt/include/global_project/macros/materializations/snapshots/strategies.sql {% macro maxcompute__snapshot_hash_arguments(args) -%} md5({%- for arg in args -%} coalesce(cast({{ arg }} as string), '') {% if not loop.last %} || '|' || {% endif %} {%- endfor -%}) {%- endmacro %} -- dbt-adapters/dbt/include/global_project/macros/materializations/snapshots/strategies.sql {% macro maxcompute__snapshot_string_as_time(timestamp) -%} {%- set result = "to_timestamp('" ~ timestamp ~ "')" -%} {{ return(result) }} {%- endmacro %} {% macro build_snapshot_staging_table(strategy, sql, target_relation, tblproperties) %} {% set temp_relation = make_temp_relation(target_relation) %} {% set select = snapshot_staging_table(strategy, sql, target_relation) %} {% call statement('build_snapshot_staging_relation') %} {{ create_table_as_internal(True, temp_relation, select, True, tblproperties=tblproperties) }} {% endcall %} {% do return(temp_relation) %} {% endmacro %} -- dbt-adapters/dbt/include/global_project/macros/materializations/snapshots/helper.sql {% macro maxcompute__post_snapshot(staging_relation) %} {% do adapter.drop_relation(staging_relation) %} {% endmacro %} -- dbt-adapters/dbt/include/global_project/macros/materializations/snapshots/helper.sql -- The original method of adding 1 column at a time in a loop has been changed to adding all columns at once. {% macro maxcompute__create_columns(relation, columns) %} {% if columns|length > 0 %} {% call statement() %} alter table {{ relation.render() }} add columns ( {% for column in columns %} `{{ column.name }}` {{ column.data_type }} {{- ',' if not loop.last -}} {% endfor %} ); {% endcall %} {% endif %} {% endmacro %} {% macro maxcompute__snapshot_merge_sql(target, source, insert_cols) -%} {%- set insert_cols_csv = insert_cols | join(', ') -%} {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} merge into {{ target.render() }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} when matched and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') then update set DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' then insert ({{ insert_cols_csv }}) values ( {% for column in insert_cols %} DBT_INTERNAL_SOURCE.{{ column }} {{- ',' if not loop.last -}} {% endfor %}); {% endmacro %} -- dbt-adapters/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql -- Create the snapshot table as a transactional table to support merge operations {% materialization snapshot, adapter='maxcompute' %} {%- set target_table = model.get('alias', model.get('name')) -%} {%- set strategy_name = config.get('strategy') -%} {%- set unique_key = config.get('unique_key') %} -- grab current tables grants config for comparision later on {%- set grant_config = config.get('grants') -%} {%- set tblproperties = config.get('tblproperties', none) -%} {% set target_relation_exists, target_relation = get_or_create_relation( database=model.database, schema=model.schema, identifier=target_table, type='table') -%} {%- if not target_relation.is_table -%} {% do exceptions.relation_wrong_type(target_relation, 'table') %} {%- endif -%} {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set strategy_macro = strategy_dispatch(strategy_name) %} {# The model['config'] parameter below is no longer used, but passing anyway for compatibility #} {# It was a dictionary of config, instead of the config object from the context #} {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %} {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} {% set build_or_select_sql = build_sql %} {% set final_sql = create_table_as_internal(False, target_relation, build_sql, True, tblproperties=tblproperties) %} {% else %} {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {{ adapter.valid_snapshot_target(target_relation, columns) }} {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation, tblproperties) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) | rejectattr('name', 'equalto', 'dbt_change_type') | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') | rejectattr('name', 'equalto', 'dbt_unique_key') | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) | rejectattr('name', 'equalto', 'dbt_change_type') | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') | rejectattr('name', 'equalto', 'dbt_unique_key') | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') | list %} {% set quoted_source_columns = [] %} {% for column in source_columns %} {% do quoted_source_columns.append(adapter.quote(column.name)) %} {% endfor %} {% set final_sql = snapshot_merge_sql( target = target_relation, source = staging_table, insert_cols = quoted_source_columns ) %} {% endif %} {{ check_time_data_types(build_or_select_sql) }} {% call statement('main') %} {{ final_sql }} {% endcall %} {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do persist_docs(target_relation, model) %} {% if not target_relation_exists %} {% do create_indexes(target_relation) %} {% endif %} {{ run_hooks(post_hooks, inside_transaction=True) }} {{ adapter.commit() }} {% if staging_table is defined %} {% do post_snapshot(staging_table) %} {% endif %} {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} {% endmaterialization %}