diff --git a/CALIB/Dockerfile b/CALIB/Dockerfile new file mode 100644 index 0000000..56d9106 --- /dev/null +++ b/CALIB/Dockerfile @@ -0,0 +1,23 @@ +# Use Rocky Linux as base +FROM rockylinux:9 + +# Install Python 3.12 and dependencies +RUN python3 -m ensurepip && \ + python3 -m pip install --upgrade pip + +# Set the working directory inside the container +WORKDIR /app + +# Copy the requirements file and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy your script into the container +COPY laser.py . +COPY objective.py . +COPY worker.py . +COPY mysql_storage.py . + +# Run the script when the container starts +#CMD ["python3", "laser.py", "-p", "shared/params.json", "-o", "shared/simulation_results.csv"] +CMD ["python3", "worker.py"] diff --git a/CALIB/README.rst b/CALIB/README.rst new file mode 100644 index 0000000..4a42b74 --- /dev/null +++ b/CALIB/README.rst @@ -0,0 +1,122 @@ +LASER Calibration with Optuna +============================= + +This repository demonstrates how to calibrate a spatial **Susceptible-Infected-Recovered (SIR) model** using **Optuna**, an optimization framework for hyperparameter tuning. + +Our goal is to identify **transmission rate** and **migration rate** values that produce a **second prevalence peak** in **node 0**, with a noticeable **trough between the peaks**. + +Model Overview +-------------- + +The core model (``laser.py``) implements a spatial **SIR model** with the following characteristics: + +- **500,000 agents** spread **heterogeneously** across **20 nodes**. +- **Node 0** is the **largest**, while **node 19** is the **smallest**. +- The outbreak is **seeded in 1%** of the **population of node 0**. +- **Gravity-based migration** determines agent movement between nodes. +- Infections last **5 to 15 days** (uniformly distributed). +- **Configurable** transmission and migration rates. + +Calibration Goal +---------------- + +We use **Optuna** to optimize the **transmission rate** and **migration rate** to achieve: + +✅ A **second prevalence peak** in node 0. +✅ A **clear trough** between the two peaks. + +The calibration process runs multiple simulations, adjusting parameters until the desired epidemic curve is achieved. + +Files and Structure +------------------- + +- ``laser.py`` – Implements the SIR model. +- ``run.py`` – Main calibration script (starts the Optuna process). +- ``objective.py`` – Defines the **objective function**, evaluating how well each trial matches the target epidemic curve. +- ``impatient.py`` – Allows you to inspect the **current best parameters** while calibration is still running. +- ``optuna_review_contour.py`` – Generates **Optuna visualizations** to analyze the search process. + +Installation +------------ + +Before running the calibration, install the required dependencies: + +.. code-block:: bash + + pip install -r requirements.txt + +Running Calibration +------------------- + +To start calibration, run: + +.. code-block:: bash + + python run.py + +By default, this will: + +- Run **100 trials** with **4 replicates** per trial. +- Simulate **500 timesteps per run** (each taking ~10 seconds). +- Identify the best parameter set and run **a final simulation** with those values. + +Checking Calibration Progress +----------------------------- + +To monitor the best parameters found so far, run: + +.. code-block:: bash + + python impatient.py + +To visualize the parameter search space explored by Optuna, run: + +.. code-block:: bash + + python optuna_review_contour.py + +Expected Results +---------------- + +If calibration is successful, the final prevalence plot for **node 0** should display: + +✅ A **clear second peak** in infections. +✅ A **noticeable trough** between the two peaks. + +You can modify parameters in the scripts to explore different calibration behaviors. + +Next Steps +---------- + +- Try adjusting the search space or evaluation criteria in ``objective.py``. +- Increase the number of trials to improve calibration accuracy. +- Experiment with different outbreak seeding strategies. + +Summary +------- + +This project demonstrates **LASER and Optuna-based epidemic model calibration** and is designed for researchers interested in large scale spatial disease modeling and parameter estimation. + +Dockerized +---------- + +Network Start +^^^^^^^^^^^^^ + +docker network create optuna-network + +DB Start +^^^^^^^^ + +docker run -d --name optuna-mysql --network optuna-network -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -e MYSQL_DATABASE=optuna_db mysql:latest + +Optuna Workers +^^^^^^^^^^^^^^ + +docker build -t docker.io/library/calib-worker:latest +docker run --rm --name calib-worker --network optuna-network -e STORAGE_URL="mysql+pymysql://root@optuna-mysql/mysql" docker.io/library/calib-worker:latest& + +View Results +^^^^^^^^^^^^ + +python3 impatient_mysqldocker.py diff --git a/CALIB/calib_arch_roles_steps2.dot b/CALIB/calib_arch_roles_steps2.dot new file mode 100644 index 0000000..0dad0df --- /dev/null +++ b/CALIB/calib_arch_roles_steps2.dot @@ -0,0 +1,69 @@ +digraph OptunaCalibrationWorkflow { + rankdir=TB; + node [shape=box, fontname="Arial"]; + + # Define roles (placed outside of steps) + modeler [label="Modeler (User)", style=filled, fillcolor=lightblue, shape=ellipse]; + developer [label="Developer", style=filled, fillcolor=lightgreen, shape=ellipse]; + it_admin [label="IT Admin", style=filled, fillcolor=lightgray, shape=ellipse]; + + # Step 1: Model Definition (Only Modeler) + subgraph cluster_step1 { + label="1️⃣ Model Definition (Python Only)"; + style=dashed; + + model_definition [label="Define Model Code (Python)", style=filled, fillcolor=lightblue]; + model_file [label="model.py", shape=parallelogram, style=filled, fillcolor=white]; + + model_definition -> model_file [label="Writes"]; + } + modeler -> model_definition [label="Defines Model"]; + + # Step 2: Local Calibration (Modeler + Developer) + subgraph cluster_step2 { + label="2️⃣ Local Calibration (Docker)"; + style=dashed; + + objective_function [label="Define Calibration Inputs/Outputs\n(objective.py)", style=filled, fillcolor=lightblue]; + local_run [label="Run Local Calibration (`run.py`)", style=filled, fillcolor=lightblue]; + + local_workers [label="Local Worker Containers\n(laser.py + Optuna)", style=filled, fillcolor=lightgreen]; + local_db [label="Local Database\n(SQLite/MySQL)", shape=cylinder, style=filled, fillcolor=lightgray]; + local_storage [label="Local Persistent Storage\n(Mounted Volume)", shape=cylinder, style=filled, fillcolor=gold]; + + objective_function -> local_run [label="Uses"]; + local_run -> local_workers [label="Spawns Workers"]; + local_workers -> local_db [label="Store Results"]; + local_db -> local_storage [label="File I/O", dir=both]; + } + modeler -> objective_function [label="Defines Inputs/Outputs"]; + modeler -> local_run [label="Runs Calibration Locally"]; + developer -> local_workers [label="Supports Debugging & Docker"]; + + # Step 3: Remote Calibration (Modeler + Developer + IT Admin) + subgraph cluster_step3 { + label="3️⃣ Remote Calibration (K8s)"; + style=dashed; + + remote_run [label="Run Full Calibration (`run.py`)\n(No Extra Config)", style=filled, fillcolor=lightblue]; + monitor_results [label="Monitor Progress & Plots", style=filled, fillcolor=lightblue]; + + cloud_controller [label="Controller (K8s Pod) + Optuna", style=filled, fillcolor=lightblue]; + cloud_workers [label="Worker Pods (Auto-Scaling)", style=filled, fillcolor=lightgreen]; + cloud_db [label="Cloud Database\n(PostgreSQL/MySQL)", shape=cylinder, style=filled, fillcolor=lightgray]; + cloud_storage [label="Cloud Storage\n(Azure Blob / S3 / GCS)", shape=cylinder, style=filled, fillcolor=gold]; + + remote_run -> cloud_controller [label="Runs on Cluster"]; + cloud_controller -> cloud_workers [label="Manages Workers"]; + cloud_workers -> cloud_db [label="Store Results"]; + cloud_db -> cloud_storage [label="Persistent Data", dir=both]; + + remote_run -> monitor_results [label="Check Progress"]; + } + modeler -> remote_run [label="Runs Calibration on Cluster"]; + modeler -> monitor_results [label="Monitors Progress"]; + + developer -> cloud_workers [label="Supports Debugging"]; + it_admin -> cloud_controller [label="Configures K8s & Cloud"]; + it_admin -> cloud_storage [label="Configures"]; +} diff --git a/CALIB/docker-compose.yml b/CALIB/docker-compose.yml new file mode 100644 index 0000000..19b26bf --- /dev/null +++ b/CALIB/docker-compose.yml @@ -0,0 +1,28 @@ +version: '3.8' + +services: + mysql: + image: mysql:latest + container_name: optuna-mysql + restart: always + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: optuna_db + MYSQL_USER: user + MYSQL_PASSWORD: password + ports: + - "3306:3306" + networks: + - optuna_network + + worker: + build: . + depends_on: + - mysql + environment: + - STORAGE_URL=mysql+pymysql://user:password@optuna-mysql/optuna_db + networks: + - optuna_network + +networks: + optuna_network: diff --git a/CALIB/howto_local_docker.txt b/CALIB/howto_local_docker.txt new file mode 100644 index 0000000..43188f8 --- /dev/null +++ b/CALIB/howto_local_docker.txt @@ -0,0 +1,12 @@ +# cleanup maybe +docker stop optuna-mysql && docker rm optuna-mysql + +# network start +docker network create optuna-network +# db start +docker run -d --name optuna-mysql --network optuna-network -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -e MYSQL_DATABASE=optuna_db mysql:latest +# optuna workers +docker run --rm --name calib-worker --network optuna-network -e STORAGE_URL="mysql+pymysql://root@optuna-mysql/mysql" docker.io/library/calib-worker:latest& + +# peak results +python3 impatient_mysqldocker.py diff --git a/CALIB/impatient.py b/CALIB/impatient.py new file mode 100644 index 0000000..2856989 --- /dev/null +++ b/CALIB/impatient.py @@ -0,0 +1,26 @@ +import json +import subprocess +import sys +from pathlib import Path + +import optuna +from mysql_storage import get_storage_url + +# Define the storage URL +# storage_url = "sqlite:///optuna_study.db" +storage_url = get_storage_url() +study = optuna.create_study(direction="minimize", storage=storage_url, study_name="spatial_demo_calibr8n", load_if_exists=True) + +# Get the best parameters found so far +best_params = study.best_params +print("Best parameters so far:", best_params) + +# Add fixed parameters +best_params.update({"population_size": 500000, "nodes": 20, "timesteps": 500, "initial_infected_fraction": 0.01}) + +# Save best parameters +Path("params_test.json").write_text(json.dumps(best_params, indent=4)) +print("Saved best parameters to params_test.json.") + +# Run the model +subprocess.run([sys.executable, "laser.py", "--plot", "-o", "simulation_result_test.csv", "-p", "params_test.json"], check=True) diff --git a/CALIB/laser.py b/CALIB/laser.py new file mode 100644 index 0000000..01dc467 --- /dev/null +++ b/CALIB/laser.py @@ -0,0 +1,435 @@ +import csv +import json +from pathlib import Path + +import click +import matplotlib.pyplot as plt +import numba +import numpy as np + +from laser_core.demographics.spatialpops import distribute_population_tapered as dpt +from laser_core.laserframe import LaserFrame +from laser_core.migration import gravity + + +@numba.njit(parallel=True) +def fast_reporting(node_id, disease_state, results, current_timestep, nodes): + """ + Optimized reporting function using thread-local storage (TLS) for parallel accumulation. + """ + num_threads = numba.get_num_threads() # Get number of parallel threads + + # Thread-local storage: One array per thread to avoid race conditions + local_s = np.zeros((num_threads, nodes), dtype=np.int32) + local_i = np.zeros((num_threads, nodes), dtype=np.int32) + local_r = np.zeros((num_threads, nodes), dtype=np.int32) + + # Parallel accumulation + for i in numba.prange(len(node_id)): # Loop through all agents in parallel + thread_id = numba.get_thread_id() # Get the current thread ID + node = node_id[i] + state = disease_state[i] + if state == 0: + local_s[thread_id, node] += 1 + elif state == 1: + local_i[thread_id, node] += 1 + elif state == 2: + local_r[thread_id, node] += 1 + + # Merge thread-local storage into the final results + for t in range(num_threads): + for j in numba.prange(nodes): + results[current_timestep, j, 0] += local_s[t, j] + results[current_timestep, j, 1] += local_i[t, j] + results[current_timestep, j, 2] += local_r[t, j] + + +# Define the model +class MultiNodeSIRModel: + """ + A multi-node SIR (Susceptible-Infected-Recovered) disease transmission model. + + Attributes: + params (dict): Configuration parameters for the model. + nodes (int): Number of nodes in the simulation. + population (LaserFrame): Represents the population with agent-level properties. + results (np.ndarray): Stores simulation results for S, I, and R per node. + components (list): List of components (e.g., Migration, Transmission) added to the model. + """ + + def __init__(self, params): + """ + Initializes the SIR model with the given parameters. + + Args: + params (dict): Dictionary containing parameters such as population size, + number of nodes, timesteps, and rates for transmission/migration. + """ + self.params = params + self.nodes = params["nodes"] + self.population = LaserFrame(capacity=params["population_size"], initial_count=params["population_size"]) + + # Define properties + self.population.add_scalar_property("node_id", dtype=np.int32) + self.population.add_scalar_property("disease_state", dtype=np.int32, default=0) # 0=S, 1=I, 2=R + self.population.add_scalar_property("recovery_timer", dtype=np.int32, default=0) + self.population.add_scalar_property("migration_timer", dtype=np.int32, default=0) + + # Initialize population distribution + # node_pops = dps(params["population_size"], self.nodes, frac_rural=0.3) + node_pops = dpt(params["population_size"], self.nodes) + node_ids = np.concatenate([np.full(count, i) for i, count in enumerate(node_pops)]) + np.random.shuffle(node_ids) + self.population.node_id[: params["population_size"]] = node_ids + + # Reporting structure + self.results = np.zeros((params["timesteps"], self.nodes, 3)) # S, I, R per node + + # Components + self.components = [] + + def add_component(self, component): + """ + Adds a component (e.g., Migration, Transmission, Recovery) to the model. + + Args: + component: An instance of a component to be added. + """ + self.components.append(component) + + def step(self): + """ + Advances the simulation by one timestep, updating all components and recording results. + """ + for component in self.components: + component.step() + + def run(self): + """ + Runs the simulation for the configured number of timesteps. + """ + from tqdm import tqdm + + for t in tqdm(range(self.params["timesteps"])): + self.current_timestep = t + self.step() + + def save_results(self, filename): + """ + Saves the simulation results to a CSV file. + + Args: + filename (str): Path to the output file. + """ + with Path(filename).open(mode="w", newline="") as file: + writer = csv.writer(file) + writer.writerow(["Timestep", "Node", "Susceptible", "Infected", "Recovered"]) + for t in range(self.params["timesteps"]): + for node in range(self.nodes): + writer.writerow([t, node, *self.results[t, node]]) + + def plot_results(self): + """ + Plots the prevalence of infected agents over time for all nodes. + """ + plt.figure(figsize=(10, 6)) + for i in [0]: # range(self.nodes): + prevalence = self.results[:, i, 1] / self.results[:, i, :].sum(axis=1) + plt.plot(prevalence, label=f"Node {i}") + plt.title("Prevalence Across All Nodes") + plt.xlabel("Timesteps") + plt.ylabel("Prevalence of Infected Agents") + plt.legend() + plt.show() + + +class MigrationComponent1D: + """ + Handles migration behavior of agents between nodes in the model. + + Attributes: + model (MultiNodeSIRModel): The simulation model instance. + migration_matrix (ndarray): A matrix representing migration probabilities between nodes. + """ + + def __init__(self, model): + """ + Initializes the MigrationComponent. + + Args: + model (MultiNodeSIRModel): The simulation model instance. + """ + self.model = model + + # Set initial migration timers + max_timer = int(1 / model.params["migration_rate"]) + model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"]) + + self.migration_matrix = self.get_sequential_migration_matrix(model.nodes) + + # Example customization: Disable migration from node 13 to 14 + def break_matrix_node(matrix, from_node, to_node): + matrix[from_node][to_node] = 0 + + break_matrix_node(self.migration_matrix, 13, 14) + + def get_sequential_migration_matrix(self, nodes): + """ + Creates a migration matrix where agents can only migrate to the next sequential node. + + Args: + nodes (int): Number of nodes in the simulation. + + Returns: + ndarray: A migration matrix where migration is allowed only to the next node. + """ + migration_matrix = np.zeros((nodes, nodes)) + for i in range(nodes - 1): + migration_matrix[i, i + 1] = 1.0 + return migration_matrix + + def step(self): + """ + Updates the migration state of the population by determining which agents migrate + and their destinations based on the migration matrix. + """ + node_ids = self.model.population.node_id + + # Decrement migration timers + self.model.population.migration_timer -= 1 + + # Identify agents ready to migrate + migrating_indices = np.where(self.model.population.migration_timer <= 0)[0] + if migrating_indices.size == 0: + return + + # Shuffle migrants and assign destinations based on migration matrix + np.random.shuffle(migrating_indices) + + destinations = np.empty(len(migrating_indices), dtype=int) + for origin in range(self.model.nodes): + origin_mask = node_ids[migrating_indices] == origin + num_origin_migrants = origin_mask.sum() + + if num_origin_migrants > 0: + # Assign destinations proportionally to migration matrix + destination_counts = np.round(self.migration_matrix[origin] * num_origin_migrants).astype(int) + destination_counts = np.maximum(destination_counts, 0) # Clip negative values + if destination_counts.sum() == 0: # No valid destinations + destinations[origin_mask] = origin # Stay in the same node + continue + destination_counts[origin] += num_origin_migrants - destination_counts.sum() # Adjust rounding errors + + # Create ordered destination assignments + destination_indices = np.repeat(np.arange(self.model.nodes), destination_counts) + destinations[origin_mask] = destination_indices[:num_origin_migrants] + + # Update node IDs of migrants + node_ids[migrating_indices] = destinations + + # Reset migration timers for migrated agents + self.model.population.migration_timer[migrating_indices] = np.random.randint( + 1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size + ) + + +class MigrationComponent2D: + def __init__(self, model): + """ + Initializes the MigrationComponent. + + Args: + model (MultiNodeSIRModel): The simulation model instance. + """ + self.model = model + + # Set initial migration timers + max_timer = int(1 / model.params["migration_rate"]) + model.population.migration_timer[:] = np.random.randint(1, max_timer + 1, size=model.params["population_size"]) + + self.migration_matrix = self.get_gravity_migration_matrix(model.nodes) + + def get_gravity_migration_matrix(self, nodes): + """ + Generates a gravity-based migration matrix based on population and distances between nodes. + + Args: + nodes (int): Number of nodes in the simulation. + + Returns: + ndarray: A migration matrix where each row represents probabilities of migration to other nodes. + """ + pops = np.array([np.sum(self.model.population.node_id == i) for i in range(nodes)]) + distances = np.ones((nodes, nodes)) - np.eye(nodes) + migration_matrix = gravity(pops, distances, k=1.0, a=0.5, b=0.5, c=2.0) + migration_matrix = migration_matrix / migration_matrix.sum(axis=1, keepdims=True) + return migration_matrix + + def step(self): + # Decrement migration timers + self.model.population.migration_timer -= 1 + + # Identify agents ready to migrate + migrating_indices = np.where(self.model.population.migration_timer <= 0)[0] + if migrating_indices.size == 0: + return + + np.random.shuffle(migrating_indices) + + origins = self.model.population.node_id[migrating_indices] + origin_counts = np.bincount(origins, minlength=self.model.params["nodes"]) + + offset = 0 + + for origin in range(self.model.params["nodes"]): + count = origin_counts[origin] + if count == 0: + continue + + origin_slice = migrating_indices[offset : offset + count] + offset += count + + row = self.migration_matrix[origin] + row_sum = row.sum() + if row_sum <= 0: + continue + + fraction_row = row / row_sum + destination_counts = np.random.multinomial(count, fraction_row) + destinations = np.repeat(np.arange(self.model.params["nodes"]), destination_counts) + self.model.population.node_id[origin_slice] = destinations[:count] + + # Reset migration timers for migrated agents + self.model.population.migration_timer[migrating_indices] = np.random.randint( + 1, int(1 / self.model.params["migration_rate"]) + 1, size=migrating_indices.size + ) + + +class TransmissionComponent: + """ + Handles the disease transmission dynamics within the population. + + Attributes: + model (MultiNodeSIRModel): The simulation model instance. + """ + + def __init__(self, model): + """ + Initializes the TransmissionComponent and infects initial agents. + + Args: + model (MultiNodeSIRModel): The simulation model instance. + """ + self.model = model + + # Infect initial individuals in node 0 + node_pops = np.bincount(self.model.population.node_id, minlength=self.model.nodes) + initial_infected = int(self.model.params["initial_infected_fraction"] * node_pops[0]) + infected_indices = np.random.choice(np.where(self.model.population.node_id == 0)[0], size=initial_infected, replace=False) + self.model.population.disease_state[infected_indices] = 1 + self.model.population.recovery_timer[infected_indices] = np.random.randint(5, 15, size=initial_infected) + + def step(self): + """ + Simulates disease transmission for each node in the current timestep. + """ + last_timestep = self.model.current_timestep # Last recorded timestep + results = self.model.results # Shortcut for clarity + + fast_reporting( + self.model.population.node_id, + self.model.population.disease_state, + self.model.results, + self.model.current_timestep, + self.model.nodes, + ) + + for i in range(self.model.nodes): + # Use precomputed values instead of recalculating + in_node = self.model.population.node_id == i + susceptible = in_node & (self.model.population.disease_state == 0) + num_infected = results[last_timestep, i, 1] + num_susceptible = int(susceptible.sum()) + total_in_node = num_susceptible + num_infected + results[last_timestep, i, 2] + + if total_in_node > 0 and num_infected > 0 and num_susceptible > 0: + infectious_fraction = num_infected / total_in_node + susceptible_fraction = num_susceptible / total_in_node + + new_infections = int(self.model.params["transmission_rate"] * infectious_fraction * susceptible_fraction * total_in_node) + + susceptible_indices = np.where(susceptible)[0] + newly_infected_indices = np.random.choice(susceptible_indices, size=new_infections, replace=False) + + self.model.population.disease_state[newly_infected_indices] = 1 + self.model.population.recovery_timer[newly_infected_indices] = np.random.randint(5, 15, size=new_infections) + + +class RecoveryComponent: + """ + Handles the recovery dynamics of infected individuals in the population. + + Attributes: + model (MultiNodeSIRModel): The simulation model instance. + """ + + def __init__(self, model): + """ + Initializes the RecoveryComponent. + + Args: + model (MultiNodeSIRModel): The simulation model instance. + """ + self.model = model + + def step(self): + """ + Updates the recovery state of infected individuals, moving them to the recovered state + if their recovery timer has elapsed. + """ + infected = self.model.population.disease_state == 1 + self.model.population.recovery_timer[infected] -= 1 + self.model.population.disease_state[(infected) & (self.model.population.recovery_timer <= 0)] = 2 + + +""" +# Parameters +params = { + "population_size": 500_000, + "nodes": 20, + "timesteps": 300, + "initial_infected_fraction": 0.01, + "transmission_rate": 0.15, + "migration_rate": 0.005 +} +""" + + +@click.command() +@click.option("-p", "--params", default="params.json", help="Path to parameters JSON file") +@click.option("-o", "--output", default="simulation_results.csv", help="Output filename for simulation results") +@click.option("--plot", is_flag=True, help="Enable plotting of results") +def main(params, output, plot): + # We expect some part of optuna to set the params.json as a prior step + with Path(params).open("r") as par: + params = json.load(par) + + # Run simulation + model = MultiNodeSIRModel(params) + model.add_component(TransmissionComponent(model)) + model.add_component(RecoveryComponent(model)) + model.add_component(MigrationComponent2D(model)) + model.run() + + # Save raw results for optuna to analyze; this might actually be annoyingly slow across a full calibration. But it's simple + # One can either store hdf5 or do the post-proc 'analyzer' step first and just save that. + print(f"DEBUG Saving {output}") + model.save_results(output) + + # Don't want to plot + if plot: + model.plot_results() + + +if __name__ == "__main__": + main() diff --git a/CALIB/mysql_storage.py b/CALIB/mysql_storage.py new file mode 100644 index 0000000..b0b2a8a --- /dev/null +++ b/CALIB/mysql_storage.py @@ -0,0 +1,11 @@ +import os + + +def get_storage_url(): + MYSQL_USER = "root" + MYSQL_PASSWORD = "root" # noqa: S105 + MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost") # Use localhost if outside Docker + MYSQL_PORT = "3306" + MYSQL_DB = "optuna_db" + + return f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}" diff --git a/CALIB/objective.py b/CALIB/objective.py new file mode 100644 index 0000000..57b2727 --- /dev/null +++ b/CALIB/objective.py @@ -0,0 +1,219 @@ +import json +import subprocess +import sys +import time +from pathlib import Path + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks + +laser_script = Path("laser.py").resolve(strict=True) + + +def evaluate_3_weights_better(results_csv): + """Evaluate if the I series of Node 0 has a second peak, while smoothly guiding towards it.""" + + # Load simulation results + df = pd.read_csv(results_csv) + + # Filter for Node 0 + node_0_data = df[df["Node"] == 0] + + # Extract infected column and total population + infected_series = node_0_data["Infected"].values + total_population = (node_0_data["Susceptible"] + node_0_data["Infected"] + node_0_data["Recovered"]).values + + # Normalize infected series to prevalence + infected_prevalence = infected_series / total_population + + # Find peaks + peaks, _ = find_peaks(infected_prevalence, height=0.02) # Adjust threshold if needed + + # If at least one peak exists, capture the first peak's amplitude + if len(peaks) > 0: + first_peak_amplitude = infected_prevalence[peaks[0]] + else: + first_peak_amplitude = 0 # No peak detected at all + + # **If a second peak exists, prioritize it massively over single peaks** + if len(peaks) >= 2: + second_peak_amplitude = infected_prevalence[peaks[1]] + + # Find the trough between peaks + trough_idx = np.argmin(infected_prevalence[peaks[0] : peaks[1]]) + peaks[0] + trough_amplitude = infected_prevalence[trough_idx] + + # Score based on how close first and second peaks are (ideal ratio = 1) + peak_ratio = abs(second_peak_amplitude / first_peak_amplitude - 1) + + # Measure "trough-iness" as how deep the valley is + trough_depth = first_peak_amplitude - trough_amplitude + + # Penalize high first peaks (we want them low) + first_peak_penalty = max(0, first_peak_amplitude - 0.05) # Penalize if above 5% prevalence + + # **Significantly reduce score if a second peak is found** + score = (peak_ratio - trough_depth + first_peak_penalty) * 0.1 # 10x better + + print(f"✅ Second peak found! {peak_ratio=}, {trough_depth=}, {first_peak_penalty=}, {score=}") + return score + + # **Guiding Single-Peak Solutions Toward Two-Peak Solutions** + # If no second peak exists, return a smoother penalty. + # Lower first peaks are better, but we encourage later infection rebounds. + post_peak_growth = np.max(np.diff(infected_prevalence[peaks[0] :])) if len(peaks) > 0 else 0 + + # Score single-peak solutions: Lower peaks are rewarded, post-peak growth is encouraged + penalty = (first_peak_amplitude * 50) - post_peak_growth * 100 # Encourage later rebounds + + print(f"❌ No second peak yet. {first_peak_amplitude=}, {post_peak_growth=}, {penalty=}") + return max(5, penalty) # Ensure values stay smooth and don't jump directly to "10" + + +def evaluate_3_weights(results_csv): + """Evaluate if the I series of Node 0 has a second peak, while penalizing high first peaks.""" + + # Load simulation results + df = pd.read_csv(results_csv) + + # Filter for Node 0 + node_0_data = df[df["Node"] == 0] + + # Extract infected column and total population + infected_series = node_0_data["Infected"].values + total_population = node_0_data["Susceptible"] + node_0_data["Infected"] + node_0_data["Recovered"] + + # Normalize infected series to prevalence + infected_prevalence = infected_series / total_population + + # Find peaks + peaks, _ = find_peaks(infected_prevalence, height=0.04) # Adjust threshold as needed + + # If at least one peak exists, capture the first peak's amplitude + if len(peaks) > 0: + infected_prevalence = infected_prevalence.to_numpy() # Ensure NumPy indexing + first_peak_amplitude = infected_prevalence[peaks[0]] + else: + first_peak_amplitude = 0 # No peak detected at all + + # If fewer than 2 peaks, return a continuous penalty rather than an early exit + if len(peaks) < 2: + return 10 + first_peak_amplitude * 100 # Penalize high first peaks more + + # Extract the second peak's amplitude + second_peak_amplitude = infected_prevalence[peaks[1]] + + # Find the trough between peaks + trough_idx = np.argmin(infected_prevalence[peaks[0] : peaks[1]]) + peaks[0] + trough_amplitude = infected_prevalence[trough_idx] + + # Score based on how close first and second peaks are (ideal ratio = 1) + peak_ratio = abs(second_peak_amplitude / first_peak_amplitude - 1) + + # Measure "trough-iness" as how deep the valley is + trough_depth = first_peak_amplitude - trough_amplitude + + # Penalize high first peaks (we want them low) + first_peak_penalty = max(0, first_peak_amplitude - 0.05) # Penalize if above 5% prevalence + + # Final score (lower is better) + score = peak_ratio - trough_depth + first_peak_penalty + + print(f"{peak_ratio=}, {trough_depth=}, {first_peak_penalty=}, {score=}") + + return score + + +def evaluate_second_peak_no_trough(results_csv): + """Evaluate if the I series of Node 0 has a second peak based on amplitude ratio.""" + + # Load simulation results + df = pd.read_csv(results_csv) + + # Filter for Node 0 + node_0_data = df[df["Node"] == 0] + + # Compute total population at each timestep + total_population = node_0_data["Susceptible"] + node_0_data["Infected"] + node_0_data["Recovered"] + + # Normalize infected column + infected_series = node_0_data["Infected"] / total_population + + # Find peaks with a minimum height threshold of 0.04 + peaks, _ = find_peaks(infected_series, height=0.04) # Adjust height threshold to filter out noise + + if len(peaks) >= 2: + # Get the amplitudes of the first and second peaks + first_peak_amplitude = infected_series.iloc[peaks[0]] + second_peak_amplitude = infected_series.iloc[peaks[1]] + # Calculate the ratio of the second peak to the first peak's amplitude + amplitude_ratio = second_peak_amplitude / first_peak_amplitude + + # We want the amplitudes to be as close as possible, so minimize the absolute difference from 1 + return -abs(amplitude_ratio - 1) # The closer the ratio is to 1, the better + + return float("inf") # Return a high value if there are not enough peaks + + +# Paths to input/output files +PARAMS_FILE = "params.json" +RESULTS_FILE = "simulation_results.csv" + + +def objective(trial): + """Optuna objective function that runs laser.py with trial parameters and evaluates results.""" + + Path(RESULTS_FILE).unlink(missing_ok=True) + + # Suggest values for calibration + migration_rate = trial.suggest_float("migration_rate", 0.0001, 0.01) + transmission_rate = trial.suggest_float("transmission_rate", 0.05, 0.5) + # migration_rate = trial.suggest_float("migration_rate", 0.004, 0.004) + # transmission_rate = trial.suggest_float("transmission_rate", 0.145, 0.145) + + # Set up parameters + params = { + "population_size": 500_000, + "nodes": 20, + "timesteps": 500, + "initial_infected_fraction": 0.01, + "transmission_rate": transmission_rate, + "migration_rate": migration_rate, + } + + # Write parameters to JSON file + with Path(PARAMS_FILE).open("w") as f: + json.dump(params, f, indent=4) + + def get_docker_runstring(): + # cmd = f"docker run --rm -v .:/app/shared docker.io/jbloedow/my-laser-app:latest" + cmd = "docker run --rm -v .:/app/shared my-laser-app:latest" + return cmd.split() + + def get_native_runstring(): + return [sys.executable, str(laser_script)] + + print(f"Will be looking for {RESULTS_FILE}") + # Run laser.py as a subprocess + try: + # Run the model 4 times and collect results + scores = [] + for _ in range(4): + # subprocess.run(get_docker_runstring(), check=True) + # subprocess.run(["python3", "laser.py"], check=True) + subprocess.run(get_native_runstring(), check=True) + + # Wait until RESULTS_FILE is written + while not Path(RESULTS_FILE).exists(): + time.sleep(0.1) + + score = evaluate_3_weights(RESULTS_FILE) # Evaluate results + scores.append(score) + + # Return the average score + return np.mean(scores) + + except subprocess.CalledProcessError as e: + print(f"Error running laser.py: {e}") + return float("inf") # Penalize failed runs diff --git a/CALIB/optuna_review_contour.py b/CALIB/optuna_review_contour.py new file mode 100644 index 0000000..6b8c961 --- /dev/null +++ b/CALIB/optuna_review_contour.py @@ -0,0 +1,47 @@ +import matplotlib.pyplot as plt +import optuna + +# Load your study +study = optuna.load_study(study_name="spatial_demo_calibration4", storage="sqlite:///optuna_study.db") + + +def optuna_plots(study): + # Plot parameter space + optuna.visualization.plot_parallel_coordinate(study).show() + optuna.visualization.plot_contour(study, params=["transmission_rate", "migration_rate"]).show() + + +def custom_plot(study): + # Get trials and filter completed ones + trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] + + # Extract parameter values and scores + transmission_rates = [t.params["transmission_rate"] for t in trials] + migration_rates = [t.params["migration_rate"] for t in trials] + scores = [t.value for t in trials] # Optuna minimizes by default; invert if needed + + # Scatter plot: color by score + plt.figure(figsize=(8, 6)) + sc = plt.scatter(transmission_rates, migration_rates, c=scores, cmap="viridis", edgecolors="k", alpha=0.8) + plt.colorbar(sc, label="Objective Score") + plt.xlabel("Transmission Rate") + plt.ylabel("Migration Rate") + plt.title("Optuna Parameter Search") + plt.show() + + +def animate(trials): + fig, ax = plt.subplots(figsize=(8, 6)) + + for i, t in enumerate(trials): + plt.scatter(t.params["transmission_rate"], t.params["migration_rate"], color=plt.cm.viridis(i / len(trials)), alpha=0.7) + + plt.xlabel("Transmission Rate") + plt.ylabel("Migration Rate") + plt.title("Optuna Search Over Time") + plt.show() + + +optuna_plots(study) +# custom_plot( study ) +# animate( study.trials ) diff --git a/CALIB/params.json b/CALIB/params.json new file mode 100644 index 0000000..4ad58f6 --- /dev/null +++ b/CALIB/params.json @@ -0,0 +1,8 @@ +{ + "population_size": 500000, + "nodes": 20, + "timesteps": 500, + "initial_infected_fraction": 0.01, + "transmission_rate": 0.13735255136692343, + "migration_rate": 0.00970645980267521 +} diff --git a/CALIB/plot_opt_hist.py b/CALIB/plot_opt_hist.py new file mode 100644 index 0000000..6703d74 --- /dev/null +++ b/CALIB/plot_opt_hist.py @@ -0,0 +1,6 @@ +import optuna +from optuna.visualization import plot_optimization_history + +study = optuna.load_study(study_name="spatial_demo_calibration2", storage="sqlite:///optuna_study.db") +fig = plot_optimization_history(study) +fig.show() diff --git a/CALIB/requirements.txt b/CALIB/requirements.txt new file mode 100644 index 0000000..4b4b8ef --- /dev/null +++ b/CALIB/requirements.txt @@ -0,0 +1,6 @@ +laser-core +tqdm +optuna +scipy +pymysql +cryptography diff --git a/CALIB/run.py b/CALIB/run.py new file mode 100644 index 0000000..863b174 --- /dev/null +++ b/CALIB/run.py @@ -0,0 +1,28 @@ +import subprocess +import sys +from pathlib import Path + +import optuna +from objective import objective + +# Run Optuna optimization +study = optuna.create_study(direction="minimize") +# storage_url = get_storage_url() +storage_url = "sqlite:///optuna_study.db" # SQLite file-based DB +# You can use non-default samplers if you want; we'll go with the default +# sampler = optuna.samplers.CmaEsSampler() +# optuna.delete_study(study_name="spatial_demo_calibr8n", storage=storage_url) +study = optuna.create_study( + # sampler=sampler, + direction="minimize", + storage=storage_url, + study_name="spatial_demo_calibr8n", + load_if_exists=True, +) +study.optimize(objective, n_trials=25) # n_trials is how many more trials; it will add to an existing study if it finds it in the db. + +# Print the best parameters +print("Best parameters:", study.best_params) + +laser_script = Path("impatient.py").resolve(strict=True) +subprocess.run([sys.executable, str(laser_script)], check=True) diff --git a/CALIB/worker.py b/CALIB/worker.py new file mode 100644 index 0000000..896649c --- /dev/null +++ b/CALIB/worker.py @@ -0,0 +1,22 @@ +import os + +import optuna +from objective import objective # The objective function now runs inside the worker + +# Connect to the same storage as the controller +# storage_url = get_storage_url() + +# Load the study +# study = optuna.load_study(study_name="spatial_demo_calibr8n", storage=storage_url) + +storage_url = os.getenv("STORAGE_URL", "mysql+pymysql://user:password@optuna-mysql/optuna_db") +study_name = "spatial_demo_calib_mar14" + +try: + study = optuna.load_study(study_name=study_name, storage=storage_url) +except KeyError: + print(f"Study '{study_name}' not found. Creating a new study.") + study = optuna.create_study(study_name=study_name, storage=storage_url) + +# Run trials (each worker runs one or more trials) +study.optimize(objective, n_trials=5) # Adjust per worker diff --git a/pyproject.toml b/pyproject.toml index 6441168..7291a14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,7 @@ ignore = [ "S101", # flake8-bandit assert "S308", # flake8-bandit suspicious-mark-safe-usage "E501", # pycodestyle line-too-long + "S603" # subprocess-without-shell-equals-true ] line-length = 140 select = [