diff --git a/sqlmesh/core/plan/evaluator.py b/sqlmesh/core/plan/evaluator.py index d106e0fe8..8a39c54c2 100644 --- a/sqlmesh/core/plan/evaluator.py +++ b/sqlmesh/core/plan/evaluator.py @@ -405,6 +405,8 @@ def _restatement_intervals_across_all_environments( env_dag = DAG({s.name: {p.name for p in s.parents} for s in env.snapshots}) for restatement, intervals in prod_restatements.items(): + if restatement not in keyed_snapshots: + continue affected_snapshot_names = [restatement] + env_dag.downstream(restatement) snapshots_to_restate.update( {(keyed_snapshots[a], intervals) for a in affected_snapshot_names} diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 3964b3f97..05185f05a 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2457,6 +2457,99 @@ def _dates_in_table(table_name: str) -> t.List[str]: ] +def test_prod_restatement_plan_missing_model_in_dev( + tmp_path: Path, +): + """ + Scenario: + I have a model B in prod but only model A in dev + I restate B in prod + + Outcome: + The A model should be ignore and the plan shouldn't fail + """ + + model_a = """ + MODEL ( + name test.a, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column "ts" + ), + start '2024-01-01 00:00:00', + cron '@hourly' + ); + + select account_id, ts from test.external_table; + """ + + model_b = """ + MODEL ( + name test.b, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ts + ), + cron '@daily' + ); + + select account_id, ts from test.external_table where ts between @start_ts and @end_ts; + """ + + models_dir = tmp_path / "models" + models_dir.mkdir() + + with open(models_dir / "a.sql", "w") as f: + f.write(model_a) + + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ctx = Context(paths=[tmp_path], config=config) + + engine_adapter = ctx.engine_adapter + engine_adapter.create_schema("test") + + # source data + df = pd.DataFrame( + { + "account_id": [1001, 1002, 1003, 1004], + "ts": [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ], + } + ) + columns_to_types = { + "account_id": exp.DataType.build("int"), + "ts": exp.DataType.build("timestamp"), + } + external_table = exp.table_(table="external_table", db="test", quoted=True) + engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types) + engine_adapter.insert_append( + table_name=external_table, query_or_df=df, columns_to_types=columns_to_types + ) + + # plan + apply A[hourly] in dev + ctx.plan("dev", auto_apply=True, no_prompts=True) + + # add B[daily] in prod and remove A + with open(models_dir / "b.sql", "w") as f: + f.write(model_b) + Path(models_dir / "a.sql").unlink() + + # plan + apply dev + ctx.load() + ctx.plan(auto_apply=True, no_prompts=True) + + # restate B in prod + ctx.plan( + restate_models=["test.b"], + start="2024-01-01", + end="2024-01-02", + auto_apply=True, + no_prompts=True, + ) + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_plan_against_expired_environment(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")