From e062763eff41089ec23535c8619dfd443cec67f8 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 7 Jan 2025 17:23:11 -0600 Subject: [PATCH 1/3] Update `TestMicrobatchWithInputWithoutEventTime` to check running again raises warning The first time the project is run, the appropriate warning about inputs is raised. However, the warning is only being raised when a full parse happens. When partial parsing happens the warning isn't getting raised. In the next commit we'll fix this issue. This commit updates the test to show that the second run (with partial parsing) doesn't raise the update, and thus the test fails. --- tests/functional/microbatch/test_microbatch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 953b372b226..ef747138fbd 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -464,9 +464,11 @@ def test_run_with_event_time(self, project): assert len(catcher.caught_events) == 1 # our partition grain is "day" so running the same day without new data should produce the same results + catcher.caught_events = [] with patch_microbatch_end_time("2020-01-03 14:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[catcher.catch]) self.assert_row_count(project, "microbatch_model", 3) + assert len(catcher.caught_events) == 1 # add next two days of data test_schema_relation = project.adapter.Relation.create( From a2f585fa7c3aa998dc3278968d131efa8f0549c3 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 7 Jan 2025 17:31:01 -0600 Subject: [PATCH 2/3] Update manifest loading to _always_ check microbatch model inputs Of note we are at the point where multiple validations are iterating all of the nodes in a manifest. We should refactor these _soon_ such that we are not iterating over the same list multiple times. --- core/dbt/parser/manifest.py | 38 ++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index ba42c6637d3..023c5db9300 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -513,6 +513,7 @@ def load(self) -> Manifest: self.check_for_spaces_in_resource_names() self.check_for_microbatch_deprecations() self.check_forcing_batch_concurrency() + self.check_microbatch_model_has_a_filtered_input() return self.manifest @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self): f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})." ) - # Validate upstream node event_time (if configured) - has_input_with_event_time_config = False - for input_unique_id in node.depends_on.nodes: - input_node = self.manifest.expect(unique_id=input_unique_id) - input_event_time = input_node.config.event_time - if input_event_time: - if not isinstance(input_event_time, str): - raise dbt.exceptions.ParsingError( - f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." - ) - has_input_with_event_time_config = True - - if not has_input_with_event_time_config: - fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) - def check_forcing_batch_concurrency(self) -> None: if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): adapter = get_adapter(self.root_project) @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None: ) ) + def check_microbatch_model_has_a_filtered_input(self): + if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + # Validate upstream node event_time (if configured) + has_input_with_event_time_config = False + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time: + if not isinstance(input_event_time, str): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." + ) + has_input_with_event_time_config = True + + if not has_input_with_event_time_config: + fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) From 5b1f64c2d04b584ae9b7b2dacd93a3dcb1731268 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 7 Jan 2025 17:37:28 -0600 Subject: [PATCH 3/3] Add changie doc --- .changes/unreleased/Fixes-20250107-173719.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20250107-173719.yaml diff --git a/.changes/unreleased/Fixes-20250107-173719.yaml b/.changes/unreleased/Fixes-20250107-173719.yaml new file mode 100644 index 00000000000..2d2310f1bac --- /dev/null +++ b/.changes/unreleased/Fixes-20250107-173719.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Ensure warning about microbatch lacking filter inputs is always fired +time: 2025-01-07T17:37:19.373261-06:00 +custom: + Author: QMalcolm + Issue: "11159"