Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite execution of microbatch models to avoid blocking the main thread #11332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1da252a
Add comments to help guide the work of pushing microbatch batch orche…
QMalcolm Feb 23, 2025
90d53c2
Stub out new `MicrobatchModelRunner` and `MicrobatchBatchRunner`.
QMalcolm Feb 23, 2025
b149ec4
Make `handle_job_queue` of `RunTask` call invoke the `MicrobatchModel…
QMalcolm Feb 23, 2025
85b4f23
Straight copy `handle_microbatch_model` into `MicrobatchModelRunner`
QMalcolm Feb 23, 2025
d23095d
Convert `handle_microbatch_model` into runner `execute` type method
QMalcolm Feb 23, 2025
5279140
Begin getting batches in `MicrobatchModelRunner`
QMalcolm Feb 23, 2025
e45c2b1
Begin getting `_has_relation` in execute method of `MicrobatchModelRu…
QMalcolm Feb 23, 2025
c4d1b1c
Begin initializing empty `RunResult` for `MicrobatchModelRunner`
QMalcolm Feb 24, 2025
374a9bd
Make `RunTask` and `ThreadPool` available in `MicrobatchModelRunner`
QMalcolm Feb 24, 2025
a2331df
Make `merge_batch_results` to `MicrobatchModelRunner`
QMalcolm Feb 24, 2025
ce9333c
Begin instantiating `MicrobatchBatchRunner` during `_submit_batch`
QMalcolm Feb 24, 2025
8397264
Move `should_run_in_parallel` to `MicrobatchBatchRunner`
QMalcolm Feb 24, 2025
046e08a
Move `describe_batch` to `MicrobatchBatchRunner`
QMalcolm Feb 24, 2025
86a8439
Move batch print eventing methods to `MicrobatchBatchRunner`
QMalcolm Feb 24, 2025
da1b08c
Move `before_execute` and `after_execute` to `MicrobatchBatchRunner`
QMalcolm Feb 24, 2025
4bc33fb
Move `describe_node` to `MicrobatchBatchRunner` and `MicrobatchModelR…
QMalcolm Feb 24, 2025
df3a3f1
Implement `compile` for `MicrobatchBatchRunner` and `MicrobatchModelR…
QMalcolm Feb 24, 2025
1a8fbf1
Implement `on_skip` for `MicrobatchBatchRunner`
QMalcolm Feb 24, 2025
590fbb0
Move batch execution logic from old runner into `MicrobatchBatchRunner`
QMalcolm Feb 25, 2025
1c54050
Make `build_jinja_context_for_batch` a static method
QMalcolm Feb 25, 2025
5c5558e
Ensure first batch is run as full refresh when relevant
QMalcolm Feb 25, 2025
e60ceb6
Update `BuildTask` to use `MicrobatchModelRunner` directly
QMalcolm Feb 25, 2025
ce0d4c1
Fix mocking of `should_run_in_parallel` in microbatch tests
QMalcolm Feb 25, 2025
7ec1ce5
Fix unit tests for `build_jinja_context_for_batch`
QMalcolm Feb 26, 2025
35bc7ce
Fix unit tests for `test_should_run_in_parallel`
QMalcolm Feb 26, 2025
a4f62e3
Allow for cancellation of microbatch runs when batches are running se…
QMalcolm Mar 3, 2025
94fc86c
Enable graceful shutdown of microbatch model executing batches concur…
QMalcolm Mar 3, 2025
c07e071
Remove unecessary tracking/printing at end of `MicrobatchModelRunner.…
QMalcolm Mar 3, 2025
1ba67a9
Remove `describe_node` method from `MicrobatchBatchRunner`
QMalcolm Mar 3, 2025
20a1446
Rename print line methods in `MicrobatchBatchRunner` to simplify impl…
QMalcolm Mar 3, 2025
c81010e
Abstract initialization of `MicrobatchBuilder` into utility method
QMalcolm Mar 3, 2025
c8b78f1
Remove special skip logic in `MicrobatchModelRunner.execute`
QMalcolm Mar 3, 2025
303cdf7
Remove TODO statement which is no longer relevant
QMalcolm Mar 3, 2025
486f351
Add changie doc
QMalcolm Mar 3, 2025
a32a7ae
Use setters for `parent_task` and `pool` of `MicrobatchModelRunner`
QMalcolm Mar 3, 2025
ef461da
Add comments around the necessity of `_parent_task` and `_pool` on `M…
QMalcolm Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250303-131440.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch execution to not block main thread nor hang
time: 2025-03-03T13:14:40.432874-06:00
custom:
Author: QMalcolm
Issue: 11243 11306
18 changes: 18 additions & 0 deletions core/dbt/graph/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from multiprocessing.pool import ThreadPool


class DbtThreadPool(ThreadPool):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We created DbtThreadPool so that we can have visibility on whether .close() has been called on the pool. This class is now used instead of ThreadPool.

"""A ThreadPool that tracks whether or not it's been closed"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.closed = False

def close(self):
self.closed = True
super().close()

def is_closed(self):
return self.closed
9 changes: 5 additions & 4 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
@staticmethod
def build_jinja_context_for_batch(model: ModelNode, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state

Expand All @@ -109,9 +110,9 @@ def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, An
jinja_context: Dict[str, Any] = {}

# Microbatch model properties
jinja_context["model"] = self.model.to_dict()
jinja_context["sql"] = self.model.compiled_code
jinja_context["compiled_code"] = self.model.compiled_code
jinja_context["model"] = model.to_dict()
jinja_context["sql"] = model.compiled_code
jinja_context["compiled_code"] = model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
Expand Down
12 changes: 7 additions & 5 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)
runner.set_parent_task(self)
runner.set_pool(pool)

Check warning on line 135 in core/dbt/task/build.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/build.py#L134-L135

Added lines #L134 - L135 were not covered by tests

return self.call_runner(runner)

Expand All @@ -146,10 +147,11 @@
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
args = [runner]
self._submit(pool, args, callback)
runner.set_parent_task(self)
runner.set_pool(pool)

args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down
Loading
Loading