Skip to content

Commit

Permalink
Update manifest loading to _always_ check microbatch model inputs
Browse files Browse the repository at this point in the history
Of note we are at the point where multiple validations are iterating
all of the nodes in a manifest. We should refactor these _soon_ such that
we are not iterating over the same list multiple times.
  • Loading branch information
QMalcolm committed Jan 7, 2025
1 parent e062763 commit a2f585f
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ def load(self) -> Manifest:
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()
self.check_microbatch_model_has_a_filtered_input()

return self.manifest

Expand Down Expand Up @@ -1472,21 +1473,6 @@ def check_valid_microbatch_config(self):
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)
Expand All @@ -1508,6 +1494,28 @@ def check_forcing_batch_concurrency(self) -> None:
)
)

def check_microbatch_model_has_a_filtered_input(self):
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time:
if not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)
has_input_with_event_time_config = True

if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down

0 comments on commit a2f585f

Please sign in to comment.