-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite execution of microbatch models to avoid blocking the main thread #11332
Rewrite execution of microbatch models to avoid blocking the main thread #11332
Conversation
…stration to a runner We're working to ensure the orchestration of microbatch batches doesn't block the main thread. This will require a lot of disentangling that currently exists in run.py. As such, it made sense to "quickly" stub out a guide of what needs to be done.
The `MicrobatchBatchRunner` will be for running individual batches, whereas the `MicrobatchModelRunner` will handle the orchestration of the batches to be run for a given model.
…Runner` directly Previously `handle_job_queue` considered `MicrobatchModelRunner` special cases, and delegated to `handle_microbatch_model` to orchestrate the batches instead of delegating to the `MicrobatchModelRunner` directly. Now that the `MicrobatchModelRunner` will be handling batch orchestration, we can appropriately delegate to it, and remove the special casing.
The function won't work as is, but I felt it better to straight copy, commit, and then modify it to work in the runner context iteratively.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #11332 +/- ##
==========================================
- Coverage 88.97% 88.86% -0.12%
==========================================
Files 189 190 +1
Lines 24182 24197 +15
==========================================
- Hits 21517 21503 -14
- Misses 2665 2694 +29
Flags with carried forward coverage won't be shown. Click here to find out more.
|
We don't need these functions in `MicrobatchModelRunner` because the inherited versions of these methods from `ModelRunner` will work for our needs. Of note, we can probably also remove the need of having these functions in `MicrobatchBatchRunner` by renaming the `print_batch_start_line` and `print_batch_result_line` to the method names that the `ModelRunner` methods call.
…unner` The `MicrobatchModelRunner.compile` does nothing because `MicrobatchModelRunner` only orchestrates the batches of the model to run, and doesn't actually run the sql of the model. Thus compilation is unnecessary in `MicrobatchModelRunner`
Of note, implementing `on_skip` for `MicrobatchModelRunner` is unecessary because the inherited `on_skip` suffices.
Previously `build_jinja_context_batch` was an instance specific method of `MicrobatchBuilder`. An issue with this is that with the now existant split of `MicrobatchModelRunner` and `MicrobatchBatchRunner` we'd either need to pass the `MicrobatchBuilder` from the `MicrobatchModelRunner` to the `MicrobatchBatchRunner`, or instantiate a new `MicrobatchBuilder` in every `MicrobatchBatchRunner`. The issue with the former is that the passed in `MicrobatchBuilder` wouldn't have the `compiled_code` on the `model`. We could instead do the latter option, but instantiating a new but that seems unnecessary, when the method can easily become a static method.
3e857dd
to
35bc7ce
Compare
…rially The orchestration of batches being moved onto a runner, the `MicrobatchModelRunner`, sending a `KeyboardInterrupt` to the process no longer stopped things. This is because we previously relied on closing all active adapter contections to stop currently being executed tasks. However, the `MicrobatchModelRunner` doesn't have any active data warehouse connections itself, as adapter conections for batches are opened by the `MicrobatchBatchRunner`. Because of this, the closing of connections would cancel a running batch, but then the next batch would be submitted (and open a new connection). To stop this from happening, we needed a way to stop new batches from being submitted. To do this, we created a new `DbtThreadPool` which tracks whether or not it's been closed. If it's closed, then `_submit_batch` skips the batch entirely. NOTE: This only works if the batches are running serially. It does not work if the batches are being run concurrently as the orchestrator submits all of the batches immediately. Thus checking on `_submit_batch` is ineffective. We'll address this in the next commit.
…rently when interrupted In the previous commit we made it such that microbatch model execution could be halted when batches were being executed serially. However, that work did not make it such that the microbatch model execution would shut down when executing batches concurrently. This change, fixes that issue. Additionally we deleted a test. Unfortunately it is no longer reliably possible to test KeyboardInterrupts of microbatch models as we don't have a way to fire a keyboard interrupt at the right time consistently in our testing environment. The test that existed would hang indefinitely, as a keyboard interrupt was being raised on a thread that was not the main thread (which is impossible in the real world, as keyboard interrupts are always fired from the mian thread).
…execute` The lines in for tracking/printing at the end of `MicrobatchmodelRunner.execute` are not necessary because the `after_execute` inherited from `ModelRunner` does both of these things. Thus the lines at the end of `MicrobatchModelRunner.execute` were duplicative.
The `MicrobatchBatchRunner` never uses `describe_node` as it instead uses `describe_batch`. Thus, `describe_node` serves no purpose.
Removing this special logic is safe, and the test `TestMicrobatchModelSkipped` confirms this.
from multiprocessing.pool import ThreadPool | ||
|
||
|
||
class DbtThreadPool(ThreadPool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We created DbtThreadPool
so that we can have visibility on whether .close()
has been called on the pool. This class is now used instead of ThreadPool
.
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-1.9.latest 1.9.latest
# Navigate to the new working tree
cd .worktrees/backport-1.9.latest
# Create a new branch
git switch --create backport-11332-to-1.9.latest
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 94b6ae13b3c9bf1ae231d0bdc4b81c9d8cf712c0
# Push it to GitHub
git push --set-upstream origin backport-11332-to-1.9.latest
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-1.9.latest Then, create a pull request where the |
…ead (#11332) * Push orchestration of batches previously in the `RunTask` into `MicrobatchModelRunner` * Split `MicrobatchModelRunner` into two separate runners `MicrobatchModelRunner` is now an orchestrator of `MicrobatchBatchRunner`s, the latter being what handle actual batch execution * Introduce new `DbtThreadPool` that knows if it's been closed * Enable `MicrobatchModelRunner` to shutdown gracefully when it detects the thread pool has been closed
…ead (#11332) (#11349) * Push orchestration of batches previously in the `RunTask` into `MicrobatchModelRunner` * Split `MicrobatchModelRunner` into two separate runners `MicrobatchModelRunner` is now an orchestrator of `MicrobatchBatchRunner`s, the latter being what handle actual batch execution * Introduce new `DbtThreadPool` that knows if it's been closed * Enable `MicrobatchModelRunner` to shutdown gracefully when it detects the thread pool has been closed Co-authored-by: Michelle Ark <[email protected]>
Resolves #11243
Resolves #11306
Problem
There are two problems
Solution
Checklist