dbt/include/maxcompute/macros/materializations/incremental/incremental_strategy/insert_overwrite.sql (68 lines of code) (raw):

{% macro mc_generate_incremental_insert_overwrite_build_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, tblproperties ) %} {% if partition_by is none %} {% set missing_partition_msg -%} The 'insert_overwrite' strategy requires the `partition_by` config. {%- endset %} {% do exceptions.raise_compiler_error(missing_partition_msg) %} {% endif %} {% if partition_by.fields|length != 1 %} {% set missing_partition_msg -%} The 'insert_overwrite' strategy requires the `partition_by` config. {%- endset %} {% do exceptions.raise_compiler_error(missing_partition_msg) %} {% endif %} {% set build_sql = mc_insert_overwrite_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, tblproperties ) %} {{ return(build_sql) }} {% endmacro %} {% macro mc_insert_overwrite_sql( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, tblproperties ) %} {% if not tmp_relation_exists %} {%- call statement('create_tmp_relation') -%} {{ create_table_as_internal(True, tmp_relation, sql, True, partition_config=partition_by, tblproperties=tblproperties) }} {%- endcall -%} {% endif %} -- 3. run the merge statement {%- call statement('main') -%} {% if partitions is not none and partitions != [] %} {# static #} {{ mc_static_insert_overwrite_merge_sql(target_relation, tmp_relation, partition_by, partitions) }} {% else %} {# dynamic #} {{ mc_dynamic_insert_overwrite_sql(target_relation, tmp_relation, partition_by) }} {% endif %} {%- endcall -%} -- 4. clean up the temp table drop table if exists {{ tmp_relation }} {% endmacro %} {% macro mc_static_insert_overwrite_merge_sql(target, source, partition_by, partitions) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none and include_sql_header }} {%- call statement('drop_static_partition') -%} DELETE FROM {{ target }} WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }}) {%- endcall -%} INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }}) ( SELECT * FROM {{ source }} WHERE {{ partition_by.render(False) }} in ({{ partitions | join(',') }}) ) {% endmacro %} {% macro mc_dynamic_insert_overwrite_sql(target, source, partition_by) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none and include_sql_header }} {% if partition_by.auto_partition() -%} INSERT OVERWRITE TABLE {{ target }} ( SELECT * FROM {{ source }} ) {%- else -%} INSERT OVERWRITE TABLE {{ target }} PARTITION({{ partition_by.render(False) }}) ( SELECT * FROM {{ source }} ) {%- endif -%} {% endmacro %}