Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a benchmark/example for numexpr usage under free-threading conditions #508

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions bench/free_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#################################################################################
# To mimic the scenario that computation is i/o bound and constrained by memory
#
# It's a much simplified version that the chunk is computed in a loop,
# and expression is evaluated in a sequence, which is not true in reality.
# Neverthless, numexpr outperforms numpy.
#################################################################################
"""
Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 2 threads): 4.612313 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 0.951172 seconds
numexpr speedup: 4.85x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 2 threads): 23.862752 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.182058 seconds
numexpr speedup: 10.94x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 2 threads): 20.594895 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.927881 seconds
numexpr speedup: 7.03x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 2 threads): 12.834101 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 5.392480 seconds
numexpr speedup: 2.38x
----------------------------------------
"""

import os

os.environ["NUMEXPR_NUM_THREADS"] = "1"
import threading
import timeit

import numpy as np

import numexpr as ne

array_size = 10**8
num_runs = 10
num_chunks = 32 # Number of chunks
num_threads = 16 # Number of threads constrained by how many chunks memory can hold

a = np.random.rand(array_size).reshape(10**4, -1)
b = np.random.rand(array_size).reshape(10**4, -1)
c = np.random.rand(array_size).reshape(10**4, -1)

chunk_size = array_size // num_chunks

expressions_numpy = [
lambda a, b, c: a + b * c,
lambda a, b, c: a**2 + b**2 - 2 * a * b * np.cos(c),
lambda a, b, c: np.sin(a) + np.log(b) * np.sqrt(c),
lambda a, b, c: np.exp(a) + np.tan(b) - np.sinh(c),
]

expressions_numexpr = [
"a + b * c",
"a**2 + b**2 - 2 * a * b * cos(c)",
"sin(a) + log(b) * sqrt(c)",
"exp(a) + tan(b) - sinh(c)",
]


def benchmark_numpy_chunk(func, a, b, c, results, indices):
for index in indices:
start = index * chunk_size
end = (index + 1) * chunk_size
time_taken = timeit.timeit(
lambda: func(a[start:end], b[start:end], c[start:end]), number=num_runs
)
results.append(time_taken)


def benchmark_numexpr_re_evaluate(expr, a, b, c, results, indices):
for index in indices:
start = index * chunk_size
end = (index + 1) * chunk_size
# if index == 0:
# Evaluate the first chunk with evaluate
time_taken = timeit.timeit(
lambda: ne.evaluate(
expr,
local_dict={
"a": a[start:end],
"b": b[start:end],
"c": c[start:end],
},
),
number=num_runs,
)
# else:
# Re-evaluate subsequent chunks with re_evaluate
# time_taken = timeit.timeit(
# lambda: ne.re_evaluate(
# local_dict={"a": a[start:end], "b": b[start:end], "c": c[start:end]}
# ),
# number=num_runs,
# )
results.append(time_taken)


def run_benchmark_threaded():
chunk_indices = list(range(num_chunks))

for i in range(len(expressions_numpy)):
print(f"Benchmarking Expression {i+1}:")

results_numpy = []
results_numexpr = []

threads_numpy = []
for j in range(num_threads):
indices = chunk_indices[j::num_threads] # Distribute chunks across threads
thread = threading.Thread(
target=benchmark_numpy_chunk,
args=(expressions_numpy[i], a, b, c, results_numpy, indices),
)
threads_numpy.append(thread)
thread.start()

for thread in threads_numpy:
thread.join()

numpy_time = sum(results_numpy)
print(
f"NumPy time (threaded over {num_chunks} chunks with {num_threads} threads): {numpy_time:.6f} seconds"
)

threads_numexpr = []
for j in range(num_threads):
indices = chunk_indices[j::num_threads] # Distribute chunks across threads
thread = threading.Thread(
target=benchmark_numexpr_re_evaluate,
args=(expressions_numexpr[i], a, b, c, results_numexpr, indices),
)
threads_numexpr.append(thread)
thread.start()

for thread in threads_numexpr:
thread.join()

numexpr_time = sum(results_numexpr)
print(
f"numexpr time (threaded with re_evaluate over {num_chunks} chunks with {num_threads} threads): {numexpr_time:.6f} seconds"
)
print(f"numexpr speedup: {numpy_time / numexpr_time:.2f}x")
print("-" * 40)


if __name__ == "__main__":
run_benchmark_threaded()
10 changes: 3 additions & 7 deletions numexpr/necompiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,12 @@ def getArguments(names, local_dict=None, global_dict=None, _frame_depth: int=2):


# Dictionaries for caching variable names and compiled expressions
# _names_cache = CacheDict(256)
_names_cache = threading.local()
# _numexpr_cache = CacheDict(256)
_numexpr_cache = threading.local()
# _numexpr_last = ContextDict()
_numexpr_last = threading.local()
evaluate_lock = threading.Lock()


def validate(ex: str,
local_dict: Optional[Dict] = None,
global_dict: Optional[Dict] = None,
Expand Down Expand Up @@ -856,7 +854,6 @@ def validate(ex: str,
----

"""
global _numexpr_last
if not hasattr(_numexpr_last, 'l'):
_numexpr_last.l = ContextDict()

Expand Down Expand Up @@ -998,7 +995,6 @@ def re_evaluate(local_dict: Optional[Dict] = None,
The calling frame depth. Unless you are a NumExpr developer you should
not set this value.
"""
global _numexpr_last
if not hasattr(_numexpr_last, 'l'):
_numexpr_last.l = ContextDict()

Expand All @@ -1009,5 +1005,5 @@ def re_evaluate(local_dict: Optional[Dict] = None,
argnames = _numexpr_last.l['argnames']
args = getArguments(argnames, local_dict, global_dict, _frame_depth=_frame_depth)
kwargs = _numexpr_last.l['kwargs']
with evaluate_lock:
return compiled_ex(*args, **kwargs)
# with evaluate_lock:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrancescAlted, it seems that this lock is a performance bottleneck when multiple threads are ran in parallel, is it necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the lock:

Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 16 threads): 3.276298 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 9.975059 seconds
numexpr speedup: 0.33x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 16 threads): 18.981946 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 50.327974 seconds
numexpr speedup: 0.38x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 16 threads): 20.414158 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 70.900648 seconds
numexpr speedup: 0.29x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 16 threads): 38.012808 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 160.476216 seconds
numexpr speedup: 0.24x
----------------------------------------

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without locking:

Benchmarking Expression 1:
NumPy time (threaded over 32 chunks with 16 threads): 3.415349 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 2.618876 seconds
numexpr speedup: 1.30x
----------------------------------------
Benchmarking Expression 2:
NumPy time (threaded over 32 chunks with 16 threads): 19.005238 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 12.611407 seconds
numexpr speedup: 1.51x
----------------------------------------
Benchmarking Expression 3:
NumPy time (threaded over 32 chunks with 16 threads): 20.555149 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 17.690749 seconds
numexpr speedup: 1.16x
----------------------------------------
Benchmarking Expression 4:
NumPy time (threaded over 32 chunks with 16 threads): 38.338372 seconds
numexpr time (threaded with re_evaluate over 32 chunks with 16 threads): 35.074684 seconds
numexpr speedup: 1.09x
----------------------------------------

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, that was introduced long ago, so I don't remember at all; but if it is there, I'd say that it is necessary, yes. Just to double check, what happens to the test suite if you remove the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are passing locally and on CI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I did my tests locally on a series of packages that depend on numexpr, and everything seems fine with removing the lock. So, feel free in proceeding with the lock removal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the lock was in place to prevent concurrent access to the caches, which shouldn't be much of an issue now, given they were made thread-local

return compiled_ex(*args, **kwargs)