diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4a2ca8a..eae9e39 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,9 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.4.8 # Use the latest version + rev: v0.4.8 # Use the latest version hooks: - id: ruff - args: [--fix] # Optional: to enable lint fixes + args: [--fix] # Optional: to enable lint fixes - id: ruff-format - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.6.0 diff --git a/docs/examples/tissue/plot_two_cxm.py b/docs/examples/tissue/plot_two_cxm.py new file mode 100644 index 0000000..3e0639b --- /dev/null +++ b/docs/examples/tissue/plot_two_cxm.py @@ -0,0 +1,51 @@ +""" +============= +The Two Compartment Exchange Model +============= + +Simulating tissue concentrations from two compartment models with different settings. +""" + +import matplotlib.pyplot as plt + +# %% +# Import necessary packages +import numpy as np +import osipi + +# %% +# Generate Parker AIF with default settings. + +# Define time points in units of seconds - in this case we use a time +# resolution of 1 sec and a total duration of 6 minutes. +t = np.arange(0, 5 * 60, 1, dtype=float) + +# Create an AIF with default settings +ca = osipi.aif_parker(t) + +# %% +# Plot the tissue concentrations for an extracellular volume fraction +# of 0.2, plasma volume fraction of 0.1, permeability serface area of 5 ml/min +# and flow rate of 10 ml/min +PS = 15 # Permeability surface area product in ml/min +Fp = [5, 25] # Flow rate in ml/min +ve = 0.1 # Extracellular volume fraction +vp = [0.1, 0.02] # Plasma volume fraction + +ct = osipi.two_compartment_exchange_model(t, ca, Fp=Fp[0], PS=PS, ve=ve, vp=vp[0]) +plt.plot(t, ct, "b-", label=f" Fp = {Fp[0]},PS = {PS}, ve = {ve}, vp = {vp[0]}") + +ct = osipi.two_compartment_exchange_model(t, ca, Fp=Fp[1], PS=PS, ve=ve, vp=vp[0]) +plt.plot(t, ct, "r-", label=f" Fp = {Fp[1]},PS = {PS}, ve = {ve}, vp = {vp[0]}") + +ct = osipi.two_compartment_exchange_model(t, ca, Fp=Fp[0], PS=PS, ve=ve, vp=vp[1]) +plt.plot(t, ct, "g-", label=f" Fp = {Fp[0]},PS = {PS}, ve = {ve}, vp = {vp[1]}") + +ct = osipi.two_compartment_exchange_model(t, ca, Fp=Fp[1], PS=PS, ve=ve, vp=vp[1]) +plt.plot(t, ct, "y-", label=f" Fp = {Fp[1]},PS = {PS}, ve = {ve}, vp = {vp[1]}") + + +plt.xlabel("Time (sec)") +plt.ylabel("Tissue concentration (mM)") +plt.legend() +plt.show() diff --git a/docs/references/models/tissue_models/2cxm.md b/docs/references/models/tissue_models/2cxm.md new file mode 100644 index 0000000..68f2f7f --- /dev/null +++ b/docs/references/models/tissue_models/2cxm.md @@ -0,0 +1,17 @@ +# osipi.Two Compartment Exchange Model + +::: osipi.two_compartment_exchange_model + + +## Example using `osipi.two_compartment_exchange_model` + +
+ The Two Compartment Exchange Model +
+ + + The Two Compartment Exchange Model + + +
+
diff --git a/docs/references/models/tissue_models/index.md b/docs/references/models/tissue_models/index.md index 298b575..5584f43 100644 --- a/docs/references/models/tissue_models/index.md +++ b/docs/references/models/tissue_models/index.md @@ -1,3 +1,4 @@ - [tofts](tofts.md) - [extended_tofts](extended_tofts.md) +- [two_cxm](2cxm.md) diff --git a/mkdocs.yml b/mkdocs.yml index a742a1c..183e016 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -75,6 +75,7 @@ nav: - references/models/tissue_models/index.md - osipi.tofts: references/models/tissue_models/tofts.md - osipi.extended_tofts: references/models/tissue_models/extended_tofts.md + - osipi.two_compartment_exchange: references/models/tissue_models/2cxm.md - Examples: generated/gallery diff --git a/src/osipi/__init__.py b/src/osipi/__init__.py index 454bb7e..ffe955b 100644 --- a/src/osipi/__init__.py +++ b/src/osipi/__init__.py @@ -7,7 +7,8 @@ from ._tissue import ( tofts, - extended_tofts + extended_tofts, + two_compartment_exchange_model ) from ._signal import ( diff --git a/src/osipi/_tissue.py b/src/osipi/_tissue.py index 372b41c..15bdf5f 100755 --- a/src/osipi/_tissue.py +++ b/src/osipi/_tissue.py @@ -2,6 +2,7 @@ import numpy as np from scipy.interpolate import interp1d +from scipy.signal import convolve from ._convolution import exp_conv @@ -332,3 +333,165 @@ def extended_tofts( ct = ct_func(t) return ct + + +def two_compartment_exchange_model( + t: np.ndarray, + ca: np.ndarray, + Fp: float, + PS: float, + ve: float, + vp: float, + Ta: float = 30.0, +) -> np.ndarray: + """Two compartment exchange model + + Tracer flows from the AIF to the blood plasma compartment; two-way leakage + between the plasma and extracellular compartments(EES) is permitted. + + Args: + t: 1D array of times(s). [OSIPI code is Q.GE1.004] + ca: Arterial concentrations in mM for each time point in t. [OSIPI code is Q.IC1.001.[a]] + Fp: Blood plasma flow rate into a unit tissue volume in ml/min. [OSIPI code is Q.PH1.002] + PS: Permeability surface area product in ml/min. [OSIPI code is Q.PH1.004] + ve: Extracellular volume fraction. [OSIPI code Q.PH1.001.[e]] + vp: Plasma volume fraction. [OSIPI code Q.PH1.001.[p]] + Ta: Arterial delay time, i.e., + difference in onset time between tissue curve and AIF in units of sec. [OSIPI code Q.PH1.007] + + Returns: + Ct: Tissue concentrations in mM + + See Also: + `extended_tofts` + + References: + - Lexicon url: https://osipi.github.io/OSIPI_CAPLEX/perfusionModels/#indicator-kinetic-models + - Lexicon code: M.IC1.009 + - OSIPI name: Two Compartment Exchange Model + - Adapted from contributions by: MJT_UoEdinburgh_UK + + Example: + Create an array of time points covering 6 min in steps of 1 sec, + calculate the Parker AIF at these time points, calculate tissue concentrations + using the Two Compartment Exchange model and plot the results. + + >>> import matplotlib.pyplot as plt + >>> import osipi + + Calculate AIF + + >>> t = np.arange(0, 6 * 60, 0.1) + >>> ca = osipi.aif_parker(t) + + Plot the tissue concentrations for an extracellular volume fraction + of 0.2, plasma volume fraction of 0.1, permeability serface area of 5 ml/min + and flow rate of 10 ml/min + + >>> PS = 5 # Permeability surface area product in ml/min + >>> Fp = 10 # Flow rate in ml/min + >>> ve = 0.2 # Extracellular volume fraction + >>> vp = 0.1 # Plasma volume fraction + >>> ct = osipi.two_compartment_exchange_model(t, ca, Fp, PS, ve, vp) + >>> plt.plot(t, ca, "r", t, ct, "g") + + """ + # Input validation + if t.size != ca.size: + raise ValueError(f"t and ca must have the same size, got {t.size} and {ca.size}") + + for param, name in [(Fp, "Fp"), (PS, "PS"), (ve, "ve"), (vp, "vp")]: + if not (np.isscalar(param) and np.isreal(param)): + raise TypeError(f"{name} must be a numeric scalar value, got {type(param).__name__}") + + if Fp <= 0: + raise ValueError(f"Fp must be positive, got {Fp}") + if PS < 0: + raise ValueError(f"PS must be non-negative, got {PS}") + if not (0 <= ve <= 1): + raise ValueError(f"ve must be in range [0, 1], got {ve}") + if not (0 <= vp <= 1): + raise ValueError(f"vp must be in range [0, 1], got {vp}") + if (ve + vp) > 1: + raise ValueError(f"Sum of ve ({ve}) and vp ({vp}) exceeds 1") + + if vp == 0: + E = 1 - np.exp(-PS / Fp) + Ktrans = E * Fp + return tofts(t, ca, Ktrans, ve, Ta, discretization_method="conv") + + if not np.allclose(np.diff(t), np.diff(t)[0]): + warnings.warn( + ("Non-uniform time spacing detected. Time array may be" " resampled."), + stacklevel=2, + ) + + # Convert units + fp_per_s = Fp / (60.0 * 100.0) + ps_per_s = PS / (60.0 * 100.0) + + # Calculate the impulse response function + v = ve + vp + + # Mean transit time + T = v / fp_per_s + tc = vp / fp_per_s + te = ve / ps_per_s + + upsample_factor = 1 + n = t.size + n_upsample = (n - 1) * upsample_factor + 1 + t_upsample = np.linspace(t[0], t[-1], n_upsample) + tau_upsample = t_upsample - t[0] + + sig_p = ((T + te) + np.sqrt((T + te) ** 2 - 4 * tc * te)) / (2 * tc * te) + sig_n = ((T + te) - np.sqrt((T + te) ** 2 - 4 * tc * te)) / (2 * tc * te) + + # Calculate the impulse response function for the plasma compartment and EES + + irf_cp = ( + vp + * sig_p + * sig_n + * ( + (1 - te * sig_n) * np.exp(-tau_upsample * sig_n) + + (te * sig_p - 1.0) * np.exp(-tau_upsample * sig_p) + ) + / (sig_p - sig_n) + ) + + irf_ce = ( + ve + * sig_p + * sig_n + * (np.exp(-tau_upsample * sig_n) - np.exp(-tau_upsample * sig_p)) + / (sig_p - sig_n) + ) + + irf_cp[[0]] /= 2 + irf_ce[[0]] /= 2 + + dt = np.min(np.diff(t)) / upsample_factor + + if Ta != 0: + f = interp1d( + t, + ca, + kind="linear", + bounds_error=False, + fill_value=0, + ) + ca = (t > Ta) * f(t - Ta) + + # get concentration in plasma and EES + Cp = dt * convolve(ca, irf_cp, mode="full", method="auto")[: len(t)] + Ce = dt * convolve(ca, irf_ce, mode="full", method="auto")[: len(t)] + + t_upsample = np.linspace(t[0], t[-1], n_upsample) + + Cp = np.interp(t, t_upsample, Cp) + Ce = np.interp(t, t_upsample, Ce) + + # get tissue concentration + Ct = Cp + Ce + return Ct diff --git a/tests/test_tissue.py b/tests/test_tissue.py index 304cba9..e2f413d 100755 --- a/tests/test_tissue.py +++ b/tests/test_tissue.py @@ -2,20 +2,31 @@ import numpy as np import osipi +import pytest -def test_tissue_tofts(): - """1. +@pytest.fixture +def time_array(): + return np.arange(0, 6 * 60, 1) - Basic operation of the function - test that the peak tissue concentration is less than the peak + +@pytest.fixture +def aif(time_array): + return osipi.aif_parker(time_array) + + +@pytest.fixture +def base_params(): + return {"Ktrans": 0.6, "ve": 0.2} + + +def test_tissue_tofts(time_array, aif, base_params): + """1. Basic operation of the function - test that the peak tissue concentration is less than the peak AIF """ - - t = np.linspace(0, 6 * 60, 360) - ca = osipi.aif_parker(t) - ct = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2) - assert np.round(np.max(ct)) < np.round(np.max(ca)) + ct = osipi.tofts(time_array, aif, discretization_method="exp", **base_params) + assert np.round(np.max(ct)) < np.round(np.max(aif)) # 2. Basic operation of the function - test with non-uniform spacing of # time array @@ -26,45 +37,42 @@ def test_tissue_tofts(): # 3. The offset option - test that the tissue concentration is shifted # from the AIF by the specified offset time - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2, Ta=60.0) - assert (np.min(np.where(ct > 0.0)) - np.min(np.where(ca > 0.0)) - 1) * 1 == 60.0 + ct = osipi.tofts(time_array, aif, Ta=60.0, **base_params) + assert (np.min(np.where(ct > 0.0)) - np.min(np.where(aif > 0.0)) - 1) * 1 == 60.0 # 4. Test that the discretization options give almost the same result - # time step must be very small t = np.arange(0, 6 * 60, 0.01) ca = osipi.aif_parker(t) - ct_conv = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2) - ct_exp = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2, discretization_method="exp") + ct_conv = osipi.tofts(t, ca, **base_params) + ct_exp = osipi.tofts(t, ca, discretization_method="exp", **base_params) assert np.allclose(ct_conv, ct_exp, rtol=1e-4, atol=1e-3) # 5. Test that the ratio of the area under the ct and ca curves is # approximately the extracellular volume - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct_conv = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2) - ct_exp = osipi.tofts(t, ca, Ktrans=0.6, ve=0.2, discretization_method="exp") - assert math.isclose(np.trapz(ct_conv, t) / np.trapz(ca, t), 0.2, abs_tol=1e-1) - assert math.isclose(np.trapz(ct_exp, t) / np.trapz(ca, t), 0.2, abs_tol=1e-1) + + ct_conv = osipi.tofts(time_array, aif, **base_params) + ct_exp = osipi.tofts(time_array, aif, discretization_method="exp", **base_params) + assert math.isclose( + np.trapz(ct_conv, time_array) / np.trapz(aif, time_array), 0.2, abs_tol=1e-1 + ) + assert math.isclose(np.trapz(ct_exp, time_array) / np.trapz(aif, time_array), 0.2, abs_tol=1e-1) # 6. Test specific use cases - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct_conv = osipi.tofts(t, ca, Ktrans=0, ve=0.2) + ct_conv = osipi.tofts(time_array, aif, Ktrans=0, ve=0.2) assert np.count_nonzero(ct_conv) == 0 - ct_exp = osipi.tofts(t, ca, Ktrans=0, ve=0.2, discretization_method="exp") + ct_exp = osipi.tofts(time_array, aif, Ktrans=0, ve=0.2, discretization_method="exp") assert np.count_nonzero(ct_exp) == 0 - ct_conv = osipi.tofts(t, ca, Ktrans=0.6, ve=0) + ct_conv = osipi.tofts(time_array, aif, Ktrans=0.6, ve=0) assert np.count_nonzero(ct_conv) == 0 - ct_exp = osipi.tofts(t, ca, Ktrans=0.6, ve=0, discretization_method="exp") + ct_exp = osipi.tofts(time_array, aif, Ktrans=0.6, ve=0, discretization_method="exp") assert np.count_nonzero(ct_exp) == 0 -def test_tissue_extended_tofts(): +def test_tissue_extended_tofts(time_array, aif): # 1. Basic operation of the function - test that the peak tissue # concentration is less than the peak AIF t = np.linspace(0, 6 * 60, 360) @@ -81,10 +89,8 @@ def test_tissue_extended_tofts(): # 3. The offset option - test that the tissue concentration is shifted # from the AIF by the specified offset time - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct = osipi.extended_tofts(t, ca, Ktrans=0.6, ve=0.2, vp=0.3, Ta=60.0) - assert (np.min(np.where(ct > 0.0)) - np.min(np.where(ca > 0.0)) - 1) * 1 == 60.0 + ct = osipi.extended_tofts(time_array, aif, Ktrans=0.6, ve=0.2, vp=0.3, Ta=60.0) + assert (np.min(np.where(ct > 0.0)) - np.min(np.where(aif > 0.0)) - 1) * 1 == 60.0 # 4. Test that the discretization options give almost the same result - # time step must be very small @@ -96,36 +102,100 @@ def test_tissue_extended_tofts(): # 5. Test that the ratio of the area under the ct and ca curves is # approximately the extracellular volume plus the plasma volume - - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct_conv = osipi.extended_tofts(t, ca, Ktrans=0.6, ve=0.2, vp=0.3) - ct_exp = osipi.extended_tofts(t, ca, Ktrans=0.6, ve=0.2, vp=0.3, discretization_method="exp") + ct_conv = osipi.extended_tofts(time_array, aif, Ktrans=0.6, ve=0.2, vp=0.3) + ct_exp = osipi.extended_tofts( + time_array, aif, Ktrans=0.6, ve=0.2, vp=0.3, discretization_method="exp" + ) assert math.isclose( - np.trapz(ct_conv, t) / np.trapz(ca, t), + np.trapz(ct_conv, time_array) / np.trapz(aif, time_array), 0.2 + 0.3, abs_tol=1e-1, ) - assert math.isclose(np.trapz(ct_exp, t) / np.trapz(ca, t), 0.2 + 0.3, abs_tol=1e-1) + assert math.isclose( + np.trapz(ct_exp, time_array) / np.trapz(aif, time_array), 0.2 + 0.3, abs_tol=1e-1 + ) # 6. Test specific use cases - t = np.arange(0, 6 * 60, 1) - ca = osipi.aif_parker(t) - ct_conv = osipi.extended_tofts(t, ca, Ktrans=0, ve=0.2, vp=0.3) - assert np.allclose(ct_conv, ca * 0.3, rtol=1e-4, atol=1e-3) - ct_exp = osipi.extended_tofts(t, ca, Ktrans=0, ve=0.2, vp=0.3, discretization_method="exp") - assert np.allclose(ct_conv, ca * 0.3, rtol=1e-4, atol=1e-3) + ct_conv = osipi.extended_tofts(time_array, aif, Ktrans=0, ve=0.2, vp=0.3) + assert np.allclose(ct_conv, aif * 0.3, rtol=1e-4, atol=1e-3) - ct_conv = osipi.extended_tofts(t, ca, Ktrans=0.6, ve=0, vp=0.3) - assert np.allclose(ct_conv, ca * 0.3, rtol=1e-4, atol=1e-3) + ct_exp = osipi.extended_tofts( + time_array, aif, Ktrans=0, ve=0.2, vp=0.3, discretization_method="exp" + ) + assert np.allclose(ct_conv, aif * 0.3, rtol=1e-4, atol=1e-3) + + ct_conv = osipi.extended_tofts(time_array, aif, Ktrans=0.6, ve=0, vp=0.3) + assert np.allclose(ct_conv, aif * 0.3, rtol=1e-4, atol=1e-3) + + ct_exp = osipi.extended_tofts( + time_array, aif, Ktrans=0.6, ve=0, vp=0.3, discretization_method="exp" + ) + assert np.allclose(ct_conv, aif * 0.3, rtol=1e-4, atol=1e-3) + + +@pytest.mark.parametrize( + "time_points", + [ + (np.linspace(0, 6 * 60, 360), "uniform"), + (np.geomspace(1, 6 * 60 + 1, num=360) - 1, "non-uniform"), + ], +) +def test_2cxm_basic_operation(time_points): + """Test that the peak tissue concentration is less than the peak AIF for different time arrays""" + t, _ = time_points + ca = osipi.aif_parker(t) + ct = osipi.two_compartment_exchange_model(t, ca, Fp=10, PS=5, ve=0.2, vp=0.3) + assert np.round(np.max(ct)) < np.round(np.max(ca)) - ct_exp = osipi.extended_tofts(t, ca, Ktrans=0.6, ve=0, vp=0.3, discretization_method="exp") - assert np.allclose(ct_conv, ca * 0.3, rtol=1e-4, atol=1e-3) +def test_2cxm_time_offset(time_array, aif): + """Test that the tissue concentration is shifted from the AIF by the specified offset time""" + ct = osipi.two_compartment_exchange_model(time_array, aif, Fp=10, PS=5, ve=0.2, vp=0.3, Ta=60.0) + assert (np.min(np.where(ct > 0.0)) - np.min(np.where(aif > 0.0)) - 1) * 1 == 60.0 + + +def test_2cxm_zero_vp_matches_tofts(time_array, aif): + """Test that 2CXM with vp=0 behaves like the Tofts model""" + ct = osipi.two_compartment_exchange_model(time_array, aif, Fp=10, PS=5, ve=0.2, vp=0) + ct_tofts = osipi.tofts(time_array, aif, Ktrans=3.93, ve=0.2) + assert np.allclose(ct, ct_tofts, rtol=1e-4, atol=1e-3) + + +def test_2cxm_invalid_inputs(time_array, aif): + """Test invalid inputs raise appropriate errors""" + # Test mismatched array sizes + with pytest.raises(ValueError, match="t and ca must have the same size"): + osipi.two_compartment_exchange_model(time_array[:10], aif, Fp=10, PS=5, ve=0.2, vp=0.1) + + # Test invalid parameter types and values + invalid_params = [ + ({"Fp": [10]}, TypeError, "Fp must be a numeric scalar value"), # Non-scalar Fp + ({"PS": "five"}, TypeError, "PS must be a numeric scalar value"), # Non-numeric PS + ({"ve": np.array([0.2])}, TypeError, "ve must be a numeric scalar value"), # Array ve + ({"vp": None}, TypeError, "vp must be a numeric scalar value"), # None value + ({"Fp": -5}, ValueError, "Fp must be positive"), # Negative Fp + ({"PS": -1}, ValueError, "PS must be non-negative"), # Negative PS + ({"ve": -0.1}, ValueError, "ve must be in range \[0, 1\]"), # ve < 0 + ({"vp": 1.1}, ValueError, "vp must be in range \[0, 1\]"), # vp > 1 + ( + {"ve": 0.8, "vp": 0.3}, + ValueError, + "Sum of ve \(0.8\) and vp \(0.3\) exceeds 1", + ), # ve + vp > 1 + ] + + for params, err_type, err_msg in invalid_params: + with pytest.raises(err_type, match=err_msg): + osipi.two_compartment_exchange_model( + time_array, + aif, + Fp=10 if "Fp" not in params else params["Fp"], + PS=5 if "PS" not in params else params["PS"], + ve=0.2 if "ve" not in params else params["ve"], + vp=0.1 if "vp" not in params else params["vp"], + ) -if __name__ == "__main__": - test_tissue_tofts() - test_tissue_extended_tofts() - print("All tissue concentration model tests passed!!") +if __name__ == "__main__": + pytest.main([__file__])