Skip to content

Commit

Permalink
fix: handle missing signals in built in scheduler closes #3094
Browse files Browse the repository at this point in the history
if a signal marks an interval as 'unready', then all downstream jobs
should also consider than interval as unready.
  • Loading branch information
tobymao committed Oct 17, 2024
1 parent 259778d commit 198d106
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
25 changes: 24 additions & 1 deletion sqlmesh/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,17 @@ def compute_interval_params(
"""
snapshot_batches = {}

snapshots_by_name = {snapshot.name: snapshot for snapshot in snapshots}

dag = DAG[str]()

for snapshot in snapshots_by_name.values():
dag.add(snapshot.name, snapshot.model.depends_on)

all_unready_intervals: t.Dict[str, set[SnapshotInterval]] = {}

for snapshot, intervals in missing_intervals(
snapshots,
[snapshots_by_name[fqn] for fqn in dag if fqn in snapshots],
start=start,
end=end,
execution_time=execution_time,
Expand All @@ -549,19 +558,33 @@ def compute_interval_params(
end_bounded=end_bounded,
).items():
if signal_factory and snapshot.is_model:
unready = set(intervals)

for signal in snapshot.model.render_signals(
start=start, end=end, execution_time=execution_time
):
intervals = _check_ready_intervals(
signal=signal_factory(signal),
intervals=intervals,
)
unready -= set(intervals)
else:
unready = set()

for parent in snapshot.model.depends_on:
if parent in all_unready_intervals:
unready.update(all_unready_intervals[parent])

all_unready_intervals[snapshot.name] = unready

batches = []
batch_size = snapshot.node.batch_size
next_batch: t.List[t.Tuple[int, int]] = []

for interval in intervals:
if interval in unready:
continue

if (batch_size and len(next_batch) >= batch_size) or (
next_batch and interval[0] != next_batch[-1][-1]
):
Expand Down
56 changes: 53 additions & 3 deletions tests/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
TimeColumn,
)
from sqlmesh.core.node import IntervalUnit
from sqlmesh.core.scheduler import Scheduler, compute_interval_params
from sqlmesh.core.scheduler import Scheduler, compute_interval_params, signal_factory, Batch, Signal
from sqlmesh.core.snapshot import (
Snapshot,
SnapshotEvaluator,
Expand Down Expand Up @@ -623,8 +623,6 @@ def _evaluate():


def test_signal_factory(mocker: MockerFixture, make_snapshot):
from sqlmesh.core.scheduler import signal_factory, Batch, Signal

class AlwaysReadySignal(Signal):
def check_intervals(self, batch: Batch):
return True
Expand Down Expand Up @@ -663,3 +661,55 @@ def factory(signal_metadata):
scheduler.batches(start, end, end)

assert signal_factory_invoked > 0


def test_signal_intervals(mocker: MockerFixture, make_snapshot):
class TestSignal(Signal):
def __init__(self, signal: t.Dict):
self.name = signal["kind"]

def check_intervals(self, batch: Batch):
if self.name == "a":
return [batch[0], batch[1]]
if self.name == "b":
return [batch[1], batch[2]]

a = make_snapshot(
SqlModel(
name="a",
kind="full",
start="2023-01-01",
query=parse_one("SELECT 1 x"),
signals=[{"kind": "a"}],
)
)
b = make_snapshot(
SqlModel(
name="b",
kind="full",
start="2023-01-01",
query=parse_one("SELECT 2 x"),
signals=[{"kind": "b"}],
)
)
c = make_snapshot(
SqlModel(
name="c",
kind="full",
start="2023-01-01",
query=parse_one("select * from a union select * from b"),
)
)

batches = compute_interval_params(
[c, a, b],
start="2023-01-01",
end="2023-01-03",
signal_factory=lambda signal: TestSignal(signal),
)

assert batches == {
a: [(to_datetime("2023-01-01"), to_datetime("2023-01-03"))],
b: [(to_datetime("2023-01-02"), to_datetime("2023-01-04"))],
c: [(to_datetime("2023-01-02"), to_datetime("2023-01-03"))],
}

0 comments on commit 198d106

Please sign in to comment.