Skip to content

Commit

Permalink
Fix: Support for unique key expressions for SCD2 by column models (#3075
Browse files Browse the repository at this point in the history
)
  • Loading branch information
izeigerman committed Aug 30, 2024
1 parent 0181bb5 commit fcc082b
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 4 deletions.
9 changes: 5 additions & 4 deletions sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,11 +1429,12 @@ def remove_managed_columns(
)
row_value_check = exp.or_(*row_check_conditions)
unique_key_conditions = []
for col in unique_key:
t_col = col.copy()
t_col.this.set("this", f"t_{col.name}")
for key in unique_key:
t_key = key.copy()
for col in t_key.find_all(exp.Column):
col.this.set("this", f"t_{col.name}")
unique_key_conditions.extend(
[t_col.is_(exp.Null()).not_(), col.is_(exp.Null()).not_()]
[t_key.is_(exp.Null()).not_(), key.is_(exp.Null()).not_()]
)
unique_key_check = exp.and_(*unique_key_conditions)
# unique_key_check is saying "if the row is updated"
Expand Down
192 changes: 192 additions & 0 deletions tests/core/engine_adapter/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,198 @@ def test_scd_type_2_by_column(make_mocked_engine_adapter: t.Callable):
)


def test_scd_type_2_by_column_composite_key(make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(EngineAdapter)

adapter.scd_type_2_by_column(
target_table="target",
source_table=t.cast(exp.Select, parse_one("SELECT id_a, id_b, name, price FROM source")),
unique_key=[exp.func("CONCAT", exp.column("id_a"), exp.column("id_b"))],
valid_from_col=exp.column("test_VALID_from", quoted=True),
valid_to_col=exp.column("test_valid_to", quoted=True),
check_columns=[exp.column("name"), exp.column("price")],
columns_to_types={
"id_a": exp.DataType.build("VARCHAR"),
"id_b": exp.DataType.build("VARCHAR"),
"name": exp.DataType.build("VARCHAR"),
"price": exp.DataType.build("DOUBLE"),
"test_VALID_from": exp.DataType.build("TIMESTAMP"),
"test_valid_to": exp.DataType.build("TIMESTAMP"),
},
execution_time=datetime(2020, 1, 1, 0, 0, 0),
)

assert (
adapter.cursor.execute.call_args[0][0]
== parse_one(
"""
CREATE OR REPLACE TABLE "target" AS
WITH "source" AS (
SELECT DISTINCT ON (CONCAT("id_a", "id_b"))
TRUE AS "_exists",
"id_a",
"id_b",
"name",
"price",
FROM (
SELECT
"id_a",
"id_b",
"name",
"price"
FROM "source"
) AS "raw_source"
), "static" AS (
SELECT
"id_a",
"id_b",
"name",
"price",
"test_VALID_from",
"test_valid_to",
TRUE AS "_exists"
FROM "target"
WHERE
NOT "test_valid_to" IS NULL
), "latest" AS (
SELECT
"id_a",
"id_b",
"name",
"price",
"test_VALID_from",
"test_valid_to",
TRUE AS "_exists"
FROM "target"
WHERE
"test_valid_to" IS NULL
), "deleted" AS (
SELECT
"static"."id_a",
"static"."id_b",
"static"."name",
"static"."price",
"static"."test_VALID_from",
"static"."test_valid_to"
FROM "static"
LEFT JOIN "latest"
ON CONCAT("static"."id_a", "static"."id_b") = CONCAT("latest"."id_a", "latest"."id_b")
WHERE
"latest"."test_valid_to" IS NULL
), "latest_deleted" AS (
SELECT
TRUE AS "_exists",
CONCAT("id_a", "id_b") AS "_key0",
MAX("test_valid_to") AS "test_valid_to"
FROM "deleted"
GROUP BY
CONCAT("id_a", "id_b")
), "joined" AS (
SELECT
"source"."_exists" AS "_exists",
"latest"."id_a" AS "t_id_a",
"latest"."id_b" AS "t_id_b",
"latest"."name" AS "t_name",
"latest"."price" AS "t_price",
"latest"."test_VALID_from" AS "t_test_VALID_from",
"latest"."test_valid_to" AS "t_test_valid_to",
"source"."id_a" AS "id_a",
"source"."id_b" AS "id_b",
"source"."name" AS "name",
"source"."price" AS "price"
FROM "latest"
LEFT JOIN "source"
ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b")
UNION ALL
SELECT
"source"."_exists" AS "_exists",
"latest"."id_a" AS "t_id_a",
"latest"."id_b" AS "t_id_b",
"latest"."name" AS "t_name",
"latest"."price" AS "t_price",
"latest"."test_VALID_from" AS "t_test_VALID_from",
"latest"."test_valid_to" AS "t_test_valid_to",
"source"."id_a" AS "id_a",
"source"."id_b" AS "id_b",
"source"."name" AS "name",
"source"."price" AS "price"
FROM "latest"
RIGHT JOIN "source"
ON CONCAT("latest"."id_a", "latest"."id_b") = CONCAT("source"."id_a", "source"."id_b")
WHERE
"latest"."_exists" IS NULL
), "updated_rows" AS (
SELECT
COALESCE("joined"."t_id_a", "joined"."id_a") AS "id_a",
COALESCE("joined"."t_id_b", "joined"."id_b") AS "id_b",
COALESCE("joined"."t_name", "joined"."name") AS "name",
COALESCE("joined"."t_price", "joined"."price") AS "price",
COALESCE("t_test_VALID_from", CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AS "test_VALID_from",
CASE
WHEN "joined"."_exists" IS NULL
OR (
(
NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL
)
AND (
"name" <> "t_name"
OR (
"t_name" IS NULL AND NOT "name" IS NULL
)
OR (
NOT "t_name" IS NULL AND "name" IS NULL
)
OR "price" <> "t_price"
OR (
"t_price" IS NULL AND NOT "price" IS NULL
)
OR (
NOT "t_price" IS NULL AND "price" IS NULL
)
)
)
THEN CAST('2020-01-01 00:00:00' AS TIMESTAMP)
ELSE "t_test_valid_to"
END AS "test_valid_to"
FROM "joined"
LEFT JOIN "latest_deleted"
ON CONCAT("joined"."id_a", "joined"."id_b") = "latest_deleted"."_key0"
), "inserted_rows" AS (
SELECT
"id_a",
"id_b",
"name",
"price",
CAST('2020-01-01 00:00:00' AS TIMESTAMP) AS "test_VALID_from",
CAST(NULL AS TIMESTAMP) AS "test_valid_to"
FROM "joined"
WHERE
(
NOT CONCAT("t_id_a", "t_id_b") IS NULL AND NOT CONCAT("id_a", "id_b") IS NULL
)
AND (
"name" <> "t_name"
OR (
"t_name" IS NULL AND NOT "name" IS NULL
)
OR (
NOT "t_name" IS NULL AND "name" IS NULL
)
OR "price" <> "t_price"
OR (
"t_price" IS NULL AND NOT "price" IS NULL
)
OR (
NOT "t_price" IS NULL AND "price" IS NULL
)
)
)
SELECT CAST("id_a" AS VARCHAR) AS "id_a", CAST("id_b" AS VARCHAR) AS "id_b", CAST("name" AS VARCHAR) AS "name", CAST("price" AS DOUBLE) AS "price", CAST("test_VALID_from" AS TIMESTAMP) AS "test_VALID_from", CAST("test_valid_to" AS TIMESTAMP) AS "test_valid_to" FROM (SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "static" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "updated_rows" UNION ALL SELECT "id_a", "id_b", "name", "price", "test_VALID_from", "test_valid_to" FROM "inserted_rows") AS "_subquery"
"""
).sql()
)


def test_scd_type_2_truncate(make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(EngineAdapter)

Expand Down

0 comments on commit fcc082b

Please sign in to comment.