diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index c4dcb1b8..1dc1566f 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -220,23 +220,44 @@ {% endcall %} {% endif %} - {% call statement('delete_existing_data') %} - {% if is_distributed %} - {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} - delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) - {% else %} - delete from {{ existing_relation }} where ({{ unique_key }}) in (select {{ unique_key }} - from {{ inserting_relation }}) + {% set delete_filter %} + select distinct {{ unique_key }} from {{ inserting_relation }} + {% endset %} + {% set data_to_delete_count_query %} + select count(*) from {{ existing_relation }} where ({{ unique_key }}) global in ({{ delete_filter }}) + {% endset %} + {% set data_to_delete_count = run_query(data_to_delete_count_query).rows[0].values()[0] %} + {% if data_to_delete_count > 0 %} + {{ log(data_to_delete_count ~ " rows to be deleted.", info=True) }} + {% set unique_key_query %} + -- https://github.com/ClickHouse/ClickHouse/issues/69559 + select count(distinct {{ unique_key }}) from {{ inserting_relation }} + {% endset %} + {% set unique_key_count = run_query(unique_key_query).rows[0].values()[0] %} + {% if unique_key_count == 1 %} + {% set query %} + select toString(any(tuple({{ unique_key }}))) from {{ inserting_relation }} + {% endset %} + {% set delete_filter = run_query(query).rows[0].values()[0] %} + {{ log('Delete filter: ' ~ delete_filter) }} {% endif %} - {%- if incremental_predicates %} - {% for predicate in incremental_predicates %} - and {{ predicate }} - {% endfor %} - {%- endif -%} - {{ adapter.get_model_query_settings(model) }} - {% endcall %} - + {% call statement('delete_existing_data') %} + {% if is_distributed %} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + delete from {{ existing_local }} {{ on_cluster_clause(existing_relation) }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% else %} + delete from {{ existing_relation }} where ({{ unique_key }}) in ({{ delete_filter }}) + {% endif %} + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%} + {{ adapter.get_model_query_settings(model) }} + {% endcall %} + {% else %} + {{ log("No data to be deleted, skip lightweight delete.", info=True) }} + {% endif %} {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} {% call statement('insert_new_data') %} diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index f132933d..32349b7c 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -8,7 +8,7 @@ seeds_base_csv, ) from dbt.tests.adapter.basic.test_incremental import BaseIncremental, BaseIncrementalNotSchemaChange -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, run_dbt_and_capture from tests.integration.adapter.incremental.test_base_incremental import uniq_schema @@ -56,15 +56,6 @@ def test_simple_incremental(self, project): run_dbt(["run", "--select", "unique_source_one"]) run_dbt(["run", "--select", "unique_incremental_one"]) - -lw_delete_schema = """ -version: 2 - -models: - - name: "lw_delete_inc" - description: "Incremental table" -""" - lw_delete_inc = """ {{ config( materialized='distributed_incremental', @@ -81,23 +72,93 @@ def test_simple_incremental(self, project): {% endif %} """ +lw_delete_no_op = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +{% if is_incremental() %} + SELECT toUInt64(number) as key FROM numbers(50, 10) +{% else %} + SELECT toUInt64(number) as key FROM numbers(10) +{% endif %} +""" + +LW_DELETE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key='key', + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key +UNION ALL +SELECT 1 as key +""" + +LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION = """ +{{ config( + materialized='distributed_incremental', + order_by=['key'], + unique_key=['key', 'date'], + incremental_strategy='delete+insert' + ) +}} +SELECT 1 as key, toDate('2024-10-21') as date +UNION ALL +SELECT 1 as key, toDate('2024-10-21') as date +""" + +@pytest.mark.skipif(os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster') class TestLWDeleteDistributedIncremental: @pytest.fixture(scope="class") def models(self): - return {"lw_delete_inc.sql": lw_delete_inc} + return { + "lw_delete_inc.sql": lw_delete_inc, + 'lw_delete_no_op.sql': lw_delete_no_op, + 'lw_delete_unique_key_compilation.sql': LW_DELETE_UNIQUE_KEY_COMPILATION, + 'lw_delete_composite_unique_key_compilation.sql': LW_DELETE_COMPOSITE_UNIQUE_KEY_COMPILATION, + } - @pytest.mark.skipif( - os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' - ) - def test_lw_delete(self, project): - run_dbt() + @pytest.mark.parametrize("model", ["lw_delete_inc"]) + def test_lw_delete(self, project, model): + run_dbt(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") assert result[0] == 100 - run_dbt() + _, log = run_dbt_and_capture(["run", "--select", model]) result = project.run_sql("select count(*) as num_rows from lw_delete_inc", fetch="one") + assert '20 rows to be deleted.' in log assert result[0] == 180 + @pytest.mark.parametrize("model", ["lw_delete_no_op"]) + def test_lw_delete_no_op(self, project, model): + run_dbt(["run", "--select", model]) + _, log = run_dbt_and_capture(["run", "--select", model]) + # assert that no delete query is issued against table lw_delete_no_op + assert 'rows to be deleted.' not in log + assert 'No data to be deleted, skip lightweight delete.' in log + + @pytest.mark.parametrize( + "model,delete_filter_log", + [ + ("lw_delete_unique_key_compilation", "Delete filter: (1)"), + ("lw_delete_composite_unique_key_compilation", "Delete filter: (1,'2024-10-21')"), + ], + ) + def test_lw_delete_unique_key(self, project, model, delete_filter_log): + """Assure that the delete_filter in `DELETE FROM