dbt/include/maxcompute/macros/materializations/incremental/incremental.sql (114 lines of code) (raw):

{% materialization incremental, adapter='maxcompute' -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} {%- set partitions = config.get('partitions', none) -%} {%- set lifecycle = config.get('lifecycle', none) -%} {%- set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) -%} {%- set cluster_by = config.get('cluster_by', none) -%} {%- set tblproperties = config.get('tblproperties', none) -%} {%- set incremental_strategy = config.get('incremental_strategy') or 'merge' -%} {%- set sql_hints = config.get('sql_hints', none) -%} {%- set sql_header = merge_sql_hints_and_header(sql_hints, config.get('sql_header', none)) -%} -- relations {%- set existing_relation = load_cached_relation(this) -%} {%- set target_relation = this.incorporate(type='table') -%} {%- set temp_relation = make_temp_relation(target_relation)-%} {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} -- configs {%- set unique_key = config.get('unique_key') -%} {%- if unique_key is string -%} {%- set unique_key_list = unique_key.split(',') -%} {%- elif unique_key is iterable -%} {%- set unique_key_list = unique_key -%} {%- else -%} {%- set unique_key_list = [] -%} {%- endif -%} {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} {% if unique_key_list|length > 0 and config.get('incremental_strategy')=='append' %} {% do exceptions.raise_compiler_error('append strategy is not supported for incremental models with a unique key when using MaxCompute') %} {% endif %} -- the temp_ and backup_ relations should not already exist in the database; get_relation -- will return None in that case. Otherwise, we get a relation that we can drop -- later, before we try to use this name for the current operation. This has to happen before -- BEGIN, in a separate transaction {%- set preexisting_temp_relation = load_cached_relation(temp_relation)-%} {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} -- grab current tables grants config for comparision later on {% set grant_config = config.get('grants') %} {{ drop_relation_if_exists(preexisting_temp_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} {%- call statement('main') -%} {{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by, lifecycle=lifecycle, tblproperties=tblproperties) }} {%- endcall -%} {% elif full_refresh_mode %} {% do log("Hard refreshing " ~ existing_relation) %} {{ adapter.drop_relation(existing_relation) }} {%- call statement('main') -%} {{ create_table_as_internal(False, target_relation, sql, True, partition_config=partition_by, lifecycle=lifecycle, tblproperties=tblproperties) }} {%- endcall -%} {% else %} {% set temp_relation_exists = false %} {% if on_schema_change != 'ignore' %} {#-- Check first, since otherwise we may not build a temp table --#} {#-- Python always needs to create a temp table --#} {%- call statement('create_temp_relation') -%} {{ create_table_as_internal(True, temp_relation, sql, True, partition_config=partition_by, tblproperties=tblproperties) }} {%- endcall -%} {% set temp_relation_exists = true %} {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} {% endif %} {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} {% set build_sql = mc_generate_incremental_build_sql( incremental_strategy, temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, temp_relation_exists, incremental_predicates, tblproperties ) %} {% call statement("main") %} {{ sql_header if sql_header is not none }} {{ build_sql }} {% endcall %} {% endif %} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {% do persist_docs(target_relation, model) %} {{ run_hooks(post_hooks) }} {%- if temp_relation_exists -%} {{ adapter.drop_relation(temp_relation) }} {%- endif -%} {{ return({'relations': [target_relation]}) }} {%- endmaterialization %} {% macro mc_generate_incremental_build_sql( strategy, temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, temp_relation_exists, incremental_predicates, tblproperties ) %} {% if strategy == 'insert_overwrite' %} {% set build_sql = mc_generate_incremental_insert_overwrite_build_sql( temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, temp_relation_exists, tblproperties ) %} {% elif strategy == 'microbatch' %} {% set build_sql = mc_generate_microbatch_build_sql( temp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, temp_relation_exists, tblproperties ) %} {% else %} {# strategy == 'dbt origin' #} {%- call statement('create_temp_relation') -%} {% if not temp_relation_exists %} {{ create_table_as_internal(True, temp_relation, sql, True, partition_config=partition_by, tblproperties=tblproperties) }} {% endif %} {%- endcall -%} {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, strategy) %} {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} {% endif %} {{ return(build_sql) }} {% endmacro %} {% macro get_quoted_list(column_names) %} {% set quoted = [] %} {% for col in column_names -%} {%- do quoted.append(adapter.quote(col)) -%} {%- endfor %} {{ return(quoted) }} {% endmacro %} {% macro maxcompute__get_incremental_microbatch_sql(arg_dict) %} {% if arg_dict["unique_key"] %} {% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %} {% else %} {{ exceptions.raise_compiler_error("dbt-maxcompute 'microbatch' requires a `unique_key` config") }} {% endif %} {% endmacro %}