Skip to content

Commit

Permalink
Feat: Support the empty backfill mode for the plan command
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman committed Oct 30, 2024
1 parent a6d3595 commit fa44c8b
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/guides/table_migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Consider an existing table named `my_schema.existing_table`. Migrating this tabl

b. Specify the start of the first time interval SQLMesh should track in the `MODEL` DDL `start` key (example uses "2024-01-01")

c. Create the model in the SQLMesh project without backfilling any data by running `sqlmesh plan [environment name] --skip-backfill --start 2024-01-01`, replacing "[environment name]" with an environment name other than `prod` and using the same start date from the `MODEL` DDL in step 3b.
c. Create the model in the SQLMesh project without backfilling any data by running `sqlmesh plan [environment name] --empty-backfill --start 2024-01-01`, replacing "[environment name]" with an environment name other than `prod` and using the same start date from the `MODEL` DDL in step 3b.

4. Determine the name of the model's snapshot physical table by running `sqlmesh table_name my_schema.existing_table`. For example, it might return `sqlmesh__my_schema.existing_table_123456`.
5. Rename the original table `my_schema.existing_table_temp` to `sqlmesh__my_schema.existing_table_123456`
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ Options:
matching models in the target environment.
--skip-backfill, --dry-run Skip the backfill step and only create a
virtual update for the plan.
--empty-backfill Produce empty backfill. Like --skip-backfill
no models will be backfilled, unlike --skip-
backfill missing intervals will be recorded
as if they were backfilled.
--forward-only Create a plan for forward-only changes.
--allow-destructive-model TEXT Allow destructive forward-only changes to
models whose names match the expression.
Expand Down
5 changes: 5 additions & 0 deletions sqlmesh/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None:
is_flag=True,
help="Skip the backfill step and only create a virtual update for the plan.",
)
@click.option(
"--empty-backfill",
is_flag=True,
help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.",
)
@click.option(
"--forward-only",
is_flag=True,
Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ def plan(
restate_models: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
empty_backfill: bool = False,
forward_only: t.Optional[bool] = None,
allow_destructive_models: t.Optional[t.Collection[str]] = None,
no_prompts: t.Optional[bool] = None,
Expand Down Expand Up @@ -1015,6 +1016,7 @@ def plan(
part of the target environment have no data gaps when compared against previous
snapshots for same models.
skip_backfill: Whether to skip the backfill step. Default: False.
empty_backfill: Like skip_backfill, but also records processed intervals.
forward_only: Whether the purpose of the plan is to make forward only changes.
allow_destructive_models: Models whose forward-only changes are allowed to be destructive.
no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
Expand Down Expand Up @@ -1047,6 +1049,7 @@ def plan(
restate_models=restate_models,
no_gaps=no_gaps,
skip_backfill=skip_backfill,
empty_backfill=empty_backfill,
forward_only=forward_only,
allow_destructive_models=allow_destructive_models,
no_auto_categorization=no_auto_categorization,
Expand Down Expand Up @@ -1082,6 +1085,7 @@ def plan_builder(
restate_models: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
empty_backfill: bool = False,
forward_only: t.Optional[bool] = None,
allow_destructive_models: t.Optional[t.Collection[str]] = None,
no_auto_categorization: t.Optional[bool] = None,
Expand Down Expand Up @@ -1113,6 +1117,7 @@ def plan_builder(
part of the target environment have no data gaps when compared against previous
snapshots for same models.
skip_backfill: Whether to skip the backfill step. Default: False.
empty_backfill: Like skip_backfill, but also records processed intervals.
forward_only: Whether the purpose of the plan is to make forward only changes.
allow_destructive_models: Models whose forward-only changes are allowed to be destructive.
no_auto_categorization: Indicates whether to disable automatic categorization of model
Expand Down Expand Up @@ -1246,6 +1251,7 @@ def plan_builder(
backfill_models=backfill_models,
no_gaps=no_gaps,
skip_backfill=skip_backfill,
empty_backfill=empty_backfill,
is_dev=is_dev,
forward_only=(
forward_only if forward_only is not None else self.config.plan.forward_only
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PlanBuilder:
part of the target environment have no data gaps when compared against previous
snapshots for same nodes.
skip_backfill: Whether to skip the backfill step.
empty_backfill: Like skip_backfill, but also records processed intervals.
is_dev: Whether this plan is for development purposes.
forward_only: Whether the purpose of the plan is to make forward only changes.
allow_destructive_models: A list of fully qualified model names whose forward-only changes are allowed to be destructive.
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(
backfill_models: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
empty_backfill: bool = False,
is_dev: bool = False,
forward_only: bool = False,
allow_destructive_models: t.Optional[t.Iterable[str]] = None,
Expand All @@ -104,6 +106,7 @@ def __init__(
self._context_diff = context_diff
self._no_gaps = no_gaps
self._skip_backfill = skip_backfill
self._empty_backfill = empty_backfill
self._is_dev = is_dev
self._forward_only = forward_only
self._allow_destructive_models = set(
Expand Down Expand Up @@ -251,6 +254,7 @@ def build(self) -> Plan:
provided_end=self._end,
is_dev=self._is_dev,
skip_backfill=self._skip_backfill,
empty_backfill=self._empty_backfill,
no_gaps=self._no_gaps,
forward_only=self._forward_only,
allow_destructive_models=t.cast(t.Set, self._allow_destructive_models),
Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Plan(PydanticModel, frozen=True):

is_dev: bool
skip_backfill: bool
empty_backfill: bool
no_gaps: bool
forward_only: bool
allow_destructive_models: t.Set[str]
Expand Down Expand Up @@ -95,7 +96,11 @@ def previous_plan_id(self) -> t.Optional[str]:

@property
def requires_backfill(self) -> bool:
return not self.skip_backfill and (bool(self.restatements) or bool(self.missing_intervals))
return (
not self.skip_backfill
and not self.empty_backfill
and (bool(self.restatements) or bool(self.missing_intervals))
)

@property
def has_changes(self) -> bool:
Expand Down Expand Up @@ -264,6 +269,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
environment=self.environment,
no_gaps=self.no_gaps,
skip_backfill=self.skip_backfill,
empty_backfill=self.empty_backfill,
restatements={s.name: i for s, i in self.restatements.items()},
is_dev=self.is_dev,
allow_destructive_models=self.allow_destructive_models,
Expand Down Expand Up @@ -295,6 +301,7 @@ class EvaluatablePlan(PydanticModel):
environment: Environment
no_gaps: bool
skip_backfill: bool
empty_backfill: bool
restatements: t.Dict[str, Interval]
is_dev: bool
allow_destructive_models: t.Set[str]
Expand Down
17 changes: 17 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ def _backfill(
plan: The plan to source snapshots from.
selected_snapshots: The snapshots to backfill.
"""
if plan.empty_backfill:
intervals_to_add = []
for snapshot in snapshots_by_name.values():
intervals = [snapshot.inclusive_exclusive(plan.start, plan.end, strict=False)]
is_deployable = deployability_index.is_deployable(snapshot)
intervals_to_add.append(
SnapshotIntervals(
name=snapshot.name,
identifier=snapshot.identifier,
version=snapshot.version,
intervals=intervals if is_deployable else [],
dev_intervals=intervals if not is_deployable else [],
)
)
self.state_sync.add_snapshots_intervals(intervals_to_add)
return

if not plan.requires_backfill or not selected_snapshots:
return

Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None
action="store_true",
help="Skip the backfill step and only create a virtual update for the plan.",
)
@argument(
"--empty-backfill",
action="store_true",
help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.",
)
@argument(
"--forward-only",
action="store_true",
Expand Down Expand Up @@ -418,6 +423,7 @@ def plan(self, context: Context, line: str) -> None:
backfill_models=args.backfill_model,
no_gaps=args.no_gaps,
skip_backfill=args.skip_backfill,
empty_backfill=args.empty_backfill,
forward_only=args.forward_only,
no_prompts=args.no_prompts,
auto_apply=args.auto_apply,
Expand Down
18 changes: 18 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,24 @@ def test_create_environment_no_changes_with_selector(init_and_plan_context: t.Ca
assert {o.name for o in schema_objects} == {"top_waiters"}


@freeze_time("2023-01-08 15:00:00")
def test_empty_bacfkill(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context("examples/sushi")
plan = context.plan("prod", no_prompts=True, skip_tests=True, empty_backfill=True)
context.apply(plan)

for model in context.models.values():
if model.is_seed or model.kind.is_symbolic:
continue
row_num = context.engine_adapter.fetchone(f"SELECT COUNT(*) FROM {model.name}")[0]
assert row_num == 0

plan = context.plan("prod", no_prompts=True, skip_tests=True)
assert not plan.requires_backfill
assert not plan.has_changes
assert not plan.missing_intervals


@pytest.mark.parametrize(
"context_fixture",
["sushi_context", "sushi_dbt_context", "sushi_test_dbt_context", "sushi_no_default_catalog"],
Expand Down
1 change: 1 addition & 0 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture):
execution_time="2022-01-05 12:00",
is_dev=True,
skip_backfill=False,
empty_backfill=False,
no_gaps=False,
forward_only=False,
allow_destructive_models=set(),
Expand Down
2 changes: 2 additions & 0 deletions tests/schedulers/airflow/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
environment=environment,
no_gaps=False,
skip_backfill=False,
empty_backfill=False,
restatements={snapshot.name: (to_timestamp("2024-01-01"), to_timestamp("2024-01-02"))},
is_dev=False,
allow_destructive_models=set(),
Expand Down Expand Up @@ -176,6 +177,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
},
"no_gaps": False,
"skip_backfill": False,
"empty_backfill": False,
"is_dev": False,
"forward_only": False,
"allow_destructive_models": [],
Expand Down
1 change: 1 addition & 0 deletions tests/schedulers/airflow/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _create_evaluatable_plan(
environment=environment,
no_gaps=False,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=is_dev,
allow_destructive_models=set(),
Expand Down
5 changes: 5 additions & 0 deletions tests/schedulers/airflow/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def test_create_plan_dag_spec(
environment=new_environment,
no_gaps=True,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=False,
forward_only=True,
Expand Down Expand Up @@ -249,6 +250,7 @@ def test_restatement(
environment=new_environment,
no_gaps=True,
skip_backfill=False,
empty_backfill=False,
restatements={
the_snapshot.name: (
to_timestamp("2022-01-02"),
Expand Down Expand Up @@ -374,6 +376,7 @@ def test_select_models_for_backfill(mocker: MockerFixture, random_name, make_sna
environment=new_environment,
no_gaps=True,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=False,
forward_only=True,
Expand Down Expand Up @@ -458,6 +461,7 @@ def test_create_plan_dag_spec_duplicated_snapshot(
environment=new_environment,
no_gaps=False,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=False,
forward_only=False,
Expand Down Expand Up @@ -519,6 +523,7 @@ def test_create_plan_dag_spec_unbounded_end(
environment=new_environment,
no_gaps=True,
skip_backfill=False,
empty_backfill=False,
restatements={},
is_dev=False,
forward_only=False,
Expand Down

0 comments on commit fa44c8b

Please sign in to comment.