Skip to content

Commit

Permalink
Allow to temporarily set the current registry even if it is not assoc…
Browse files Browse the repository at this point in the history
…iated with a worker thread
  • Loading branch information
adamreichold committed May 12, 2024
1 parent 3dc6b99 commit 97b2c9e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
65 changes: 64 additions & 1 deletion rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
result
}

// This is used to temporarily overwrite the current registry.
//
// This either null, a pointer to the global registry if it was
// ever used to access the global registry or a pointer to a
// registry which is temporarily made current because the current
// thread is not a worker thread but is running a scope associated
// to a specific thread pool.
thread_local! {
static CURRENT_REGISTRY: Cell<*const Arc<Registry>> = const { Cell::new(ptr::null()) };
}

#[cold]
fn set_current_registry_to_global_registry() -> *const Arc<Registry> {
let global = global_registry();

CURRENT_REGISTRY.with(|current_registry| current_registry.set(global));

global
}

fn current_registry() -> *const Arc<Registry> {
let mut current = CURRENT_REGISTRY.with(Cell::get);

if current.is_null() {
current = set_current_registry_to_global_registry();
}

current
}

struct Terminator<'a>(&'a Arc<Registry>);

impl<'a> Drop for Terminator<'a> {
Expand Down Expand Up @@ -315,14 +345,47 @@ impl Registry {
unsafe {
let worker_thread = WorkerThread::current();
let registry = if worker_thread.is_null() {
global_registry()
&*current_registry()
} else {
&(*worker_thread).registry
};
Arc::clone(registry)
}
}

/// Optionally install a specific registry as the current one.
///
/// This is used when a thread which is not a worker executes
/// a scope which should use the specific thread pool instead of
/// the global one.
pub(super) fn with_current<F, R>(registry: Option<&Arc<Registry>>, f: F) -> R
where
F: FnOnce() -> R,
{
struct Guard {
current: *const Arc<Registry>,
}

impl Guard {
fn new(registry: &Arc<Registry>) -> Self {
let current =
CURRENT_REGISTRY.with(|current_registry| current_registry.replace(registry));

Self { current }
}
}

impl Drop for Guard {
fn drop(&mut self) {
CURRENT_REGISTRY.with(|current_registry| current_registry.set(self.current));
}
}

let _guard = registry.map(Guard::new);

f()
}

/// Returns the number of threads in the current registry. This
/// is better than `Registry::current().num_threads()` because it
/// avoids incrementing the `Arc`.
Expand Down
8 changes: 5 additions & 3 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,11 @@ pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>,
where
OP: FnOnce(&Scope<'scope>) -> R,
{
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = Scope::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
Registry::with_current(registry, || {
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = Scope::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
})
}

/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
Expand Down

0 comments on commit 97b2c9e

Please sign in to comment.