Skip to content

Commit

Permalink
Fix: Restatement of models that are present in prod but are missing i…
Browse files Browse the repository at this point in the history
…n dev (#3589)
  • Loading branch information
izeigerman authored Jan 6, 2025
1 parent 00143b5 commit 34b9a1a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ def _restatement_intervals_across_all_environments(
env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots})

for restatement, intervals in prod_restatements.items():
if restatement not in keyed_snapshots:
continue
affected_snapshot_names = [restatement] + env_dag.downstream(restatement)
snapshots_to_restate.update(
{(keyed_snapshots[a], intervals) for a in affected_snapshot_names}
Expand Down
93 changes: 93 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,99 @@ def _dates_in_table(table_name: str) -> t.List[str]:
]


def test_prod_restatement_plan_missing_model_in_dev(
tmp_path: Path,
):
"""
Scenario:
I have a model B in prod but only model A in dev
I restate B in prod
Outcome:
The A model should be ignore and the plan shouldn't fail
"""

model_a = """
MODEL (
name test.a,
kind INCREMENTAL_BY_TIME_RANGE (
time_column "ts"
),
start '2024-01-01 00:00:00',
cron '@hourly'
);
select account_id, ts from test.external_table;
"""

model_b = """
MODEL (
name test.b,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ts
),
cron '@daily'
);
select account_id, ts from test.external_table where ts between @start_ts and @end_ts;
"""

models_dir = tmp_path / "models"
models_dir.mkdir()

with open(models_dir / "a.sql", "w") as f:
f.write(model_a)

config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
ctx = Context(paths=[tmp_path], config=config)

engine_adapter = ctx.engine_adapter
engine_adapter.create_schema("test")

# source data
df = pd.DataFrame(
{
"account_id": [1001, 1002, 1003, 1004],
"ts": [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
],
}
)
columns_to_types = {
"account_id": exp.DataType.build("int"),
"ts": exp.DataType.build("timestamp"),
}
external_table = exp.table_(table="external_table", db="test", quoted=True)
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
engine_adapter.insert_append(
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
)

# plan + apply A[hourly] in dev
ctx.plan("dev", auto_apply=True, no_prompts=True)

# add B[daily] in prod and remove A
with open(models_dir / "b.sql", "w") as f:
f.write(model_b)
Path(models_dir / "a.sql").unlink()

# plan + apply dev
ctx.load()
ctx.plan(auto_apply=True, no_prompts=True)

# restate B in prod
ctx.plan(
restate_models=["test.b"],
start="2024-01-01",
end="2024-01-02",
auto_apply=True,
no_prompts=True,
)


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down

0 comments on commit 34b9a1a

Please sign in to comment.