Skip to content

Commit

Permalink
Pass a MicrobatchBuilder to MicrobatchBatchRunner
Browse files Browse the repository at this point in the history
This is necessary because the materialization executor needs the
`MicrobatchBuilder` in order to build the jinja context.
  • Loading branch information
QMalcolm committed Feb 24, 2025
1 parent 1a8fbf1 commit 3e857dd
Showing 1 changed file with 112 additions and 120 deletions.
232 changes: 112 additions & 120 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,119 +336,6 @@ def execute(self, model, manifest):
return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)


class MicrobatchModelRunnerOLD(ModelRunner):
def __init__(self, config, adapter, node, node_index: int, num_nodes: int):
super().__init__(config, adapter, node, node_index, num_nodes)

# Make non-optional
self.batch_idx: Optional[int] = None
self.batches: Dict[int, BatchType] = {}
self.relation_exists: bool = False

def _build_succesful_run_batch_result(
self,
model: ModelNode,
context: Dict[str, Any],
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
# TODO: move to batch runner
run_result = self._build_run_model_result(model, context, elapsed_time)
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(
self,
model: ModelNode,
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
# TODO: move to batch runner
return RunResult(
node=model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=elapsed_time,
message="ERROR",
adapter_response={},
failures=1,
batch_results=BatchResults(failed=[batch]),
)

def _execute_microbatch_materialization(
self,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
# TODO: This method should be moved to the batch runner

batch = self.batches[self.batch_idx]
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:
# Update jinja context with batch context members
jinja_context = microbatch_builder.build_jinja_context_for_batch(
incremental_batch=self.relation_exists
)
context.update(jinja_context)

# Materialize batch and cache any materialized relations
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(
model, context, batch, time.perf_counter() - start_time
)
batch_result = batch_run_result

# At least one batch has been inserted successfully!
# Can proceed incrementally + in parallel
self.relation_exists = True

except (KeyboardInterrupt, SystemExit):
# reraise it for GraphRunnableTask.execute_nodes to handle
raise
except Exception as e:
fire_event(
GenericExceptionOnRun(
unique_id=self.node.unique_id,
exc=f"Exception on worker thread. {str(e)}",
node_info=self.node.node_info,
)
)
batch_run_result = self._build_failed_run_batch_result(
model, batch, time.perf_counter() - start_time
)

batch_result = batch_run_result

return batch_result

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
# TODO: Move to batch runner
try:
batch_result = self._execute_microbatch_materialization(
model, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

return batch_result


class MicrobatchBatchRunner(ModelRunner):
"""Handles the running of individual batches"""

Expand All @@ -462,12 +349,14 @@ def __init__(
batch_idx: int,
batches: Dict[int, BatchType],
relation_exists: bool,
microbatch_builder: MicrobatchBuilder,
):
super().__init__(config, adapter, node, node_index, num_nodes)

Check warning on line 354 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L354

Added line #L354 was not covered by tests

self.batch_idx = batch_idx
self.batches = batches
self.relation_exists = relation_exists
self.microbatch_builder = microbatch_builder

Check warning on line 359 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L358-L359

Added lines #L358 - L359 were not covered by tests

def describe_node(self) -> str:
# TODO: I'm not sure if we actually need this. We should try removing it once everything
Expand Down Expand Up @@ -583,6 +472,104 @@ def compile(self, manifest: Manifest):

return self.node

Check warning on line 473 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L473

Added line #L473 was not covered by tests

def _build_succesful_run_batch_result(
self,
model: ModelNode,
context: Dict[str, Any],
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
run_result = self._build_run_model_result(model, context, elapsed_time)
run_result.batch_results = BatchResults(successful=[batch])
return run_result

def _build_failed_run_batch_result(
self,
model: ModelNode,
batch: BatchType,
elapsed_time: float = 0.0,
) -> RunResult:
return RunResult(
node=model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=elapsed_time,
message="ERROR",
adapter_response={},
failures=1,
batch_results=BatchResults(failed=[batch]),
)

def _execute_microbatch_materialization(
self,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
batch = self.batches[self.batch_idx]

Check warning on line 510 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L510

Added line #L510 was not covered by tests
# call materialization_macro to get a batch-level run result
start_time = time.perf_counter()
try:

Check warning on line 513 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L512-L513

Added lines #L512 - L513 were not covered by tests
# Update jinja context with batch context members
jinja_context = self.microbatch_builder.build_jinja_context_for_batch(

Check warning on line 515 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L515

Added line #L515 was not covered by tests
incremental_batch=self.relation_exists
)
context.update(jinja_context)

Check warning on line 518 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L518

Added line #L518 was not covered by tests

# Materialize batch and cache any materialized relations
result = MacroGenerator(

Check warning on line 521 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L521

Added line #L521 was not covered by tests
materialization_macro, context, stack=context["context_macro_stack"]
)()
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

Check warning on line 525 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L524-L525

Added lines #L524 - L525 were not covered by tests

# Build result of executed batch
batch_run_result = self._build_succesful_run_batch_result(

Check warning on line 528 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L528

Added line #L528 was not covered by tests
model, context, batch, time.perf_counter() - start_time
)
batch_result = batch_run_result

Check warning on line 531 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L531

Added line #L531 was not covered by tests

# At least one batch has been inserted successfully!
# Can proceed incrementally + in parallel
self.relation_exists = True

Check warning on line 535 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L535

Added line #L535 was not covered by tests

except (KeyboardInterrupt, SystemExit):

Check warning on line 537 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L537

Added line #L537 was not covered by tests
# reraise it for GraphRunnableTask.execute_nodes to handle
raise
except Exception as e:
fire_event(

Check warning on line 541 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L539-L541

Added lines #L539 - L541 were not covered by tests
GenericExceptionOnRun(
unique_id=self.node.unique_id,
exc=f"Exception on worker thread. {str(e)}",
node_info=self.node.node_info,
)
)
batch_run_result = self._build_failed_run_batch_result(

Check warning on line 548 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L548

Added line #L548 was not covered by tests
model, batch, time.perf_counter() - start_time
)

batch_result = batch_run_result

Check warning on line 552 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L552

Added line #L552 was not covered by tests

return batch_result

Check warning on line 554 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L554

Added line #L554 was not covered by tests

def _execute_model(
self,
hook_ctx: Any,
context_config: Any,
model: ModelNode,
context: Dict[str, Any],
materialization_macro: MacroProtocol,
) -> RunResult:
try:
batch_result = self._execute_microbatch_materialization(

Check warning on line 565 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L564-L565

Added lines #L564 - L565 were not covered by tests
model, context, materialization_macro
)
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

Check warning on line 569 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L569

Added line #L569 was not covered by tests

return batch_result


class MicrobatchModelRunner(ModelRunner):
"""Handles the orchestration of batches to run for a given microbatch model"""
Expand Down Expand Up @@ -688,12 +675,7 @@ def merge_batch_results(self, result: RunResult, batch_results: List[RunResult])
if self.node.previous_batch_results is not None:
result.batch_results.successful += self.node.previous_batch_results.successful

Check warning on line 676 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L675-L676

Added lines #L675 - L676 were not covered by tests

def get_batches(self, model: ModelNode) -> Dict[int, BatchType]:
"""Get the batches that should be run for the model"""

# TODO: All of this setup for creating MicrobatchBuilder should _only_
# hapen if their are failed batches for us to retry

def get_microbatch_builder(self, model: ModelNode) -> MicrobatchBuilder:
# Intially set the start/end to values from args
event_time_start = getattr(self.config.args, "EVENT_TIME_START", None)
event_time_end = getattr(self.config.args, "EVENT_TIME_END", None)

Check warning on line 681 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L680-L681

Added lines #L680 - L681 were not covered by tests
Expand All @@ -705,18 +687,22 @@ def get_batches(self, model: ModelNode) -> Dict[int, BatchType]:
event_time_start = self.config.args.sample.start
event_time_end = self.config.args.sample.end

Check warning on line 688 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L687-L688

Added lines #L687 - L688 were not covered by tests

microbatch_builder = MicrobatchBuilder(
return MicrobatchBuilder(

Check warning on line 690 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L690

Added line #L690 was not covered by tests
model=model,
is_incremental=self._is_incremental(model),
event_time_start=event_time_start,
event_time_end=event_time_end,
default_end_time=get_invocation_started_at(),
)

def get_batches(self, model: ModelNode) -> Dict[int, BatchType]:
"""Get the batches that should be run for the model"""

# Note currently (02/23/2025) model.previous_batch_results is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.previous_batch_results is None:
microbatch_builder = self.get_microbatch_builder(model)
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)

Check warning on line 708 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L704-L708

Added lines #L704 - L708 were not covered by tests
Expand Down Expand Up @@ -746,6 +732,7 @@ def execute(self, model: ModelNode, manifest: Manifest) -> RunResult:

batch_results: List[RunResult] = []
batch_idx = 0
microbatch_builder = self.get_microbatch_builder(model)

Check warning on line 735 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L735

Added line #L735 was not covered by tests

# Run first batch not in parallel
relation_exists = self.parent_task._submit_batch(

Check warning on line 738 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L738

Added line #L738 was not covered by tests
Expand All @@ -757,6 +744,7 @@ def execute(self, model: ModelNode, manifest: Manifest) -> RunResult:
batch_results=batch_results,
pool=self.pool,
force_sequential_run=True,
microbatch_builder=microbatch_builder,
)
batch_idx += 1
skip_batches = batch_results[0].status != RunStatus.Success
Expand All @@ -772,6 +760,7 @@ def execute(self, model: ModelNode, manifest: Manifest) -> RunResult:
batch_results=batch_results,
pool=self.pool,
skip=skip_batches,
microbatch_builder=microbatch_builder,
)
batch_idx += 1

Expand All @@ -793,6 +782,7 @@ def execute(self, model: ModelNode, manifest: Manifest) -> RunResult:
pool=self.pool,
force_sequential_run=True,
skip=skip_batches,
microbatch_builder=microbatch_builder,
)

# Finalize run: merge results, track model run, and print final result line
Expand Down Expand Up @@ -855,6 +845,7 @@ def _submit_batch(
batch_idx: int,
batch_results: List[RunResult],
pool: ThreadPool,
microbatch_builder: MicrobatchBuilder,
force_sequential_run: bool = False,
skip: bool = False,
):
Expand All @@ -879,6 +870,7 @@ def _submit_batch(
batch_idx,
batches,
relation_exists,
microbatch_builder,
)

if skip:
Expand Down

0 comments on commit 3e857dd

Please sign in to comment.