diff --git a/docs/make.jl b/docs/make.jl index 533d734f..36bbcc1d 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -24,6 +24,7 @@ makedocs( pages = [ "Home" => "index.md", "Getting Started" => "quickstart.md", + "Backends" => "backends.md", "ClimaAtmos Setup Guide" => "atmos_setup_guide.md", "Emulate and Sample" => "emulate_sample.md", "API" => "api.md", diff --git a/docs/src/api.md b/docs/src/api.md index 04393628..13f17585 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -10,8 +10,14 @@ ClimaCalibrate.observation_map ## Backend Interface ```@docs -ClimaCalibrate.get_backend ClimaCalibrate.calibrate +ClimaCalibrate.JuliaBackend +ClimaCalibrate.WorkerBackend +ClimaCalibrate.SlurmManager +ClimaCalibrate.DerechoBackend +ClimaCalibrate.CaltechHPCBackend +ClimaCalibrate.ClimaGPUBackend +ClimaCalibrate.get_backend ClimaCalibrate.model_run ClimaCalibrate.module_load_string ``` @@ -44,4 +50,5 @@ ClimaCalibrate.get_param_dict ClimaCalibrate.path_to_iteration ClimaCalibrate.path_to_ensemble_member ClimaCalibrate.path_to_model_log +ClimaCalibrate.parameter_path ``` diff --git a/docs/src/backends.md b/docs/src/backends.md new file mode 100644 index 00000000..54075763 --- /dev/null +++ b/docs/src/backends.md @@ -0,0 +1,14 @@ +## Backends + +ClimaCalibrate can scale calibrations on different distributed computing environments, referred to as backends. Most of these are high-performance computing clusters. + +Each backend has an associated `calibrate(::AbstractBackend, ...)` dispatch, which initializes and runs the calibration on the given backend. + +The following backends are currently supported: + +- [`JuliaBackend`](@ref) +- [`WorkerBackend`](@ref) +- [`CaltechHPCBackend`](@ref) +- [`ClimaGPUBackend`](@ref) +- [`DerechoBackend`](@ref) + diff --git a/docs/src/index.md b/docs/src/index.md index f6d7248a..30147332 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -1,9 +1,11 @@ # ClimaCalibrate.jl ClimaCalibrate.jl is a toolkit for developing scalable and reproducible model -calibration pipelines using CalibrateEmulateSample.jl with minimal boilerplate. +calibration pipelines using [EnsembleKalmanProcesses.jl](https://github.com/CliMA/EnsembleKalmanProcesses.jl/) with minimal boilerplate. -To use this framework, component models (and the coupler) define their own versions of the functions provided in the interface. +This documentation assumes basical familiarity with inverse problems and [Ensemble Kalman Inversion](https://clima.github.io/EnsembleKalmanProcesses.jl/dev/ensemble_kalman_inversion/#eki) in particular. + +To use this framework, component models define their own versions of the functions provided in the interface. Calibrations can either be run using just Julia, the Caltech central cluster, NCAR Derecho, or CliMA's GPU server. -For more information, see our Getting Started page. +For more information, see our [Getting Started page](https://clima.github.io/ClimaCalibrate.jl/dev/quickstart/). diff --git a/docs/src/interface.md b/docs/src/interface.md new file mode 100644 index 00000000..fe98dc8c --- /dev/null +++ b/docs/src/interface.md @@ -0,0 +1,54 @@ +# ClimaCalibrate + +ClimaCalibrate is just a placeholder name, we can bikeshed on the name later + +ClimaCalibrate provides a framework for running component model calibrations using EnsembleKalmanProcesses. This way, artifacts for calibration experiments can be stored + +To use this framework, individual models (or the coupler) will define their own versions of the functions provided in the interface (`get_config`, `get_forward_model`, and `run_forward_model`). + +Calibrations can either be run using pure Julia or if required, an Sbatch pipeline for the Caltech Resnick cluster. + +The framework has three main interface components: + +**EKP**: Contains the functions for initializing calibration, updating ensembles, and additional emulate/sample steps. + +**Model**: Contains hooks/function stubs for component models to integrate. This consists of three functions +- `forward_model = get_forward_model(Val{experiment_id})`: Dispatches on the experiment ID to find the right component model. + +- `config = get_config(physical_model, experiment_id, iteration, member)`: Gets the configuration for the model to run + +- `run_forward_model(physical_model, config)`: Runs the forward model, which is meant to save output to a file + +**Slurm**: Provides utilities for running experiments on the central cluster, file and error handling. +It would be good to separate this from the Julia package, since this is largely specific to the central cluster. + +# Component Model + +## Model interface +The model must provide implementations of the following function stubs. This will be common across experiments for a single component model. + +`physical_model = get_physical_model(Val{experiment_id})`: Dispatches on the experiment ID to find the right component model. + +`config = get_config(physical_model, experiment_id, iteration, member)`: Gets the configuration for the model to run + +`run_forward_model(physical_model, config)`: Runs the forward model, which is meant to save output to a file + +## Experiments +This will define an individual experiment to work with the model interface and ClimaCalibrate. +An experiment should be self-documented and contained. + +Experiments need to have the following: +- **EKP configuration YAML**: This holds the configuration for the experiment, including paths to other experiment metada such as the prior distribution file.This contains: + - Path to the prior distribution + - Path to truth data and noise JLD2 files + - Ensemble size + - Number of iterations + - Output directory + - In the future, other EKP configuration arguments (Unscented etc) +- **Observational data and noise**: Serialized to JLD2 +- **Prior distribution file**: TOML format. Distributions should generally use the [constrained_gaussian](https://clima.github.io/EnsembleKalmanProcesses.jl/dev/API/ParameterDistributions/#EnsembleKalmanProcesses.ParameterDistributions.constrained_gaussian) function. +- **Project.toml and compat**: Setting compat requirements is important for reproducibility, and allows for additional imports needed in the observation map. +- **Observation map**: A function `observation_map` which computes the observation map for a full ensemble. + - The `observation_map` function can live in a file in the experiment folder which is `included` in the calibration script. Alternatively, all observation maps for a given model can be kept in the model interface to avoid using `include`. However, it has the drawbacks of adding unnecessary dependencies for a given experiment. +- **Model configuration file** (optional): May be required by the model interface. This should inherit values like the output directory for the EKP config if applicable. +- **Script for generating the observational data** (optional): Ideal for reproducibility, especially for perfect model scenarios. \ No newline at end of file diff --git a/docs/src/quickstart.md b/docs/src/quickstart.md index 20743979..a5dfda6b 100644 --- a/docs/src/quickstart.md +++ b/docs/src/quickstart.md @@ -1,53 +1,53 @@ # Getting Started -First, make sure your system meets the dependencies of [CalibrateEmulateSample.jl](https://clima.github.io/CalibrateEmulateSample.jl/dev/installation_instructions/). -You can run calibrations a cluster that supports Slurm or on your local machine. +Every calibration requires the following information: +- A forward model +- Observational data +- Prior parameter distribution -A good way to get started is to run the example experiment, `surface_fluxes_perfect_model`, which uses the [SurfaceFluxes.jl](https://github.com/CliMA/SurfaceFluxes.jl) package to generate a physical model that calculates the Monin Obukhov turbulent surface fluxes based on idealized atmospheric and surface conditions. Since this is a "perfect model" example, the same model is used to generate synthetic observations using its default parameters and a small amount of noise. These synthetic observations are considered to be the ground truth, which is used to assess the model ensembles' performance when parameters are drawn from the prior parameter distributions. - -It is a perfect-model calibration, serving as a test case for the initial pipeline. -By default, it runs 10 ensemble members for 6 iterations. Further details can be found in the experiment folder, `experiments/surace_fluxes_perfect_model`. - -## Local Machine - -To run the example experiment on your local machine, first open your REPL with the proper project: -`julia --project=experiments/surface_fluxes_perfect_model` - -Next, run the following code: -```julia -import ClimaCalibrate +To run a calibration, you will need to implement two functions: +- [`forward_model(iteration, member)`](@ref): Run the forward model, saving model output to disk. +- [`observation_map(iteration)`](@ref): Return the full ensemble's observation map, transforming forward model output into the same space as the observational data. -experiment_dir = dirname(Base.active_project()) +Since these functions only have access to the iteration and member numbers but need to set parameters and save data, there are some helpful hooks: +- [`path_to_ensemble_member`](@ref) returns the ensemble member's output directory, which can be used to set the forward model's output directory. +- [`parameter_path`](@ref) returns the ensemble member's parameter file, which can be loaded in via TOML or passed to ClimaParams. -# Generate observational data and include observational map + model interface -include(joinpath(experiment_dir, "generate_data.jl")) -include(joinpath(experiment_dir, "observation_map.jl")) -include(joinpath(experiment_dir, "model_interface.jl")) +# Example Calibration -eki = ClimaCalibrate.calibrate(JuliaBackend, experiment_dir) -include(joinpath(experiment_dir, "postprocessing.jl")) -``` +A good way to get started is to run the example experiment, `surface_fluxes_perfect_model`, which uses the [SurfaceFluxes.jl](https://github.com/CliMA/SurfaceFluxes.jl) package to generate a physical model that calculates the Monin Obukhov turbulent surface fluxes based on idealized atmospheric and surface conditions. Since this is a "perfect model" example, the same model is used to generate synthetic observations using its default parameters and a small amount of noise. These synthetic observations are considered to be the ground truth, which is used to assess the model ensembles' performance when parameters are drawn from the prior parameter distributions. -## HPC Cluster -This method will queue Julia processes to run on your slurm cluster. +It is a perfect-model calibration, using its own output as observational data. +By default, it runs 20 ensemble members for 6 iterations. +This example can be run on the most common backend, the WorkerBackend, with the following script: -To run this experiment: -1. Log onto the Caltech HPC -2. Clone ClimaCalibrate.jl and `cd` into the repository. -3. Start julia: `julia --project=experiments/surace_fluxes_perfect_model` -4. Run the following: ```julia -import ClimaCalibrate: CaltechHPCBackend, calibrate - -experiment_dir = dirname(Base.active_project()) - -include(joinpath(experiment_dir, "generate_data.jl")) -model_interface = joinpath(experiment_dir, "model_interface.jl") -include(joinpath(experiment_dir, "observation_map.jl")) -eki = calibrate(CaltechHPCBackend, experiment_dir; - time_limit = 3, model_interface) - -include(joinpath(experiment_dir, "postprocessing.jl")) +using ClimaCalibrate, Distributed + +addprocs(SlurmManager(5)) +include(joinpath(pkgdir(ClimaCalibrate), "test", "sf_calibration_utils.jl")) + +eki = calibrate( + WorkerBackend, + ensemble_size, + n_iterations, + observation, + variance, + prior, + output_dir, +) + +test_sf_calibration_output(eki, prior, observation) + +theta_star_vec = + (; coefficient_a_m_businger = 4.7, coefficient_a_h_businger = 4.7) + +convergence_plot( + eki, + prior, + theta_star_vec, + ["coefficient_a_m_businger", "coefficient_a_h_businger"], +) + +g_vs_iter_plot(eki) ``` - -New experiments should be defined within the component model repos (in this case, `SurfaceFluxes.jl`), so that the internals of `ClimaCalibrate.jl` do not explicitly depend on component models. diff --git a/experiments/surface_fluxes_perfect_model/postprocessing.jl b/experiments/surface_fluxes_perfect_model/postprocessing.jl index 7680b484..83eecaa5 100644 --- a/experiments/surface_fluxes_perfect_model/postprocessing.jl +++ b/experiments/surface_fluxes_perfect_model/postprocessing.jl @@ -149,8 +149,7 @@ for iter in 0:N_iter model_config["toml"] = [ joinpath( pkg_dir, - ClimaCalibrate.path_to_ensemble_member(output_dir, iter, i), - "parameters.toml", + ClimaCalibrate.parameter_path(output_dir, iter, i), ), ] ustar_mod = diff --git a/src/backends.jl b/src/backends.jl index 94b7e22c..323139c7 100644 --- a/src/backends.jl +++ b/src/backends.jl @@ -9,16 +9,43 @@ export HPCBackend, ClimaGPUBackend, DerechoBackend, CaltechHPCBackend abstract type AbstractBackend end +""" + JuliaBackend + +The simplest backend, used to run a calibration in Julia without any parallelization. +""" struct JuliaBackend <: AbstractBackend end abstract type HPCBackend <: AbstractBackend end abstract type SlurmBackend <: HPCBackend end +""" + CaltechHPCBackend + +Used for Caltech's [high-performance computing cluster](https://www.hpc.caltech.edu/). +""" struct CaltechHPCBackend <: SlurmBackend end + +""" + ClimaGPUBackend + +Used for CliMA's private GPU server. +""" struct ClimaGPUBackend <: SlurmBackend end +""" + DerechoBackend + +Used for NSF NCAR's [Derecho supercomputing system](https://ncar-hpc-docs.readthedocs.io/en/latest/compute-systems/derecho/). +""" struct DerechoBackend <: HPCBackend end +""" + WorkerBackend + +Used to run calibrations on Distributed.jl's workers. +For use on a Slurm cluster, see [`SlurmManager`](@ref). +""" struct WorkerBackend <: AbstractBackend end """ @@ -28,6 +55,7 @@ Get ideal backend for deploying forward model runs. Each backend is found via `gethostname()`. Defaults to JuliaBackend if none is found. """ function get_backend() + # TODO: Add WorkerBackend as default if there are multiple workers HOSTNAMES = [ (r"^clima.gps.caltech.edu$", ClimaGPUBackend), (r"^login[1-4].cm.cluster$", CaltechHPCBackend), @@ -112,9 +140,13 @@ function calibrate( (e, catch_backtrace()) for i in 0:(n_iterations - 1) @info "Running iteration $i" - pmap(1:ensemble_size; retry_delays = 0, on_error) do m - forward_model(i, m) - @info "Completed member $m" + foreach(1:ensemble_size) do m + try + forward_model(i, m) + @info "Completed member $m" + catch e + on_error(e) + end end G_ensemble = observation_map(i) save_G_ensemble(config, i, G_ensemble) diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index e194f0eb..ff93903a 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -3,11 +3,12 @@ import JLD2 import Random using Distributions import EnsembleKalmanProcesses as EKP -using EnsembleKalmanProcesses.ParameterDistributions -using EnsembleKalmanProcesses.TOMLInterface +import EnsembleKalmanProcesses.ParameterDistributions as PD +import EnsembleKalmanProcesses.TOMLInterface as TI export ExperimentConfig, get_prior, initialize, update_ensemble, save_G_ensemble -export path_to_ensemble_member, path_to_model_log, path_to_iteration +export path_to_ensemble_member, + path_to_model_log, path_to_iteration, parameter_path """ ExperimentConfig( @@ -30,7 +31,7 @@ Base.@kwdef struct ExperimentConfig ensemble_size::Integer observations::Any noise::Any - prior::ParameterDistribution + prior::PD.ParameterDistribution output_dir::Any end @@ -85,7 +86,18 @@ end Return the path to an ensemble member's directory for a given iteration and member number. """ path_to_ensemble_member(output_dir, iteration, member) = - EKP.TOMLInterface.path_to_ensemble_member(output_dir, iteration, member) + TI.path_to_ensemble_member(output_dir, iteration, member) + +const DEFAULT_PARAMETER_FILE = "parameters.toml" +""" + parameter_path(output_dir, iteration, member) + +Return the path to an ensemble member's parameter file. +""" +parameter_path(output_dir, iteration, member) = joinpath( + path_to_ensemble_member(output_dir, iteration, member), + DEFAULT_PARAMETER_FILE, +) """ path_to_model_log(output_dir, iteration, member) @@ -119,8 +131,8 @@ end function get_prior(param_dict::AbstractDict; names = nothing) names = isnothing(names) ? keys(param_dict) : names - prior_vec = [get_parameter_distribution(param_dict, n) for n in names] - prior = combine_distributions(prior_vec) + prior_vec = [TI.get_parameter_distribution(param_dict, n) for n in names] + prior = PD.combine_distributions(prior_vec) return prior end @@ -131,12 +143,10 @@ Generates a dictionary for parameters based on the specified distribution, assum If `names` is not provided, the distribution's names will be used. """ function get_param_dict( - distribution::PD; + distribution::PDD; names = distribution.name, -) where {PD <: ParameterDistributions.ParameterDistribution} - return Dict( - name => Dict{Any, Any}("type" => "float") for name in distribution.name - ) +) where {PDD <: PD.ParameterDistribution} + return Dict(name => Dict{Any, Any}("type" => "float") for name in names) end """ @@ -294,12 +304,12 @@ Save EKI state and parameters. Helper function for [`initialize`](@ref) and [`up """ function save_eki_and_parameters(eki, output_dir, iteration, prior) param_dict = get_param_dict(prior) - save_parameter_ensemble( + TI.save_parameter_ensemble( EKP.get_u_final(eki), prior, param_dict, output_dir, - "parameters.toml", + DEFAULT_PARAMETER_FILE, iteration, ) diff --git a/src/workers.jl b/src/workers.jl index 7ae2915e..f16b5c9d 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -45,6 +45,16 @@ worker_cookie() = begin end worker_arg() = `--worker=$(worker_cookie())` +""" + SlurmManager(ntasks=get(ENV, "SLURM_NTASKS", 1)) + +The ClusterManager for Slurm clusters, taking in the number of tasks to request with `srun`. +To execute the `srun` command, run `addprocs(SlurmManager(ntasks))` +Keyword arguments can be passed to `srun`: `addprocs(SlurmManager(ntasks), gpus_per_task=1)` +By default the workers will inherit the running Julia environment. +To run a calibration, call `calibrate(WorkerBackend, ...)` +To run functions on a worker, call `remotecall(func, worker_id, args...)` +""" struct SlurmManager <: ClusterManager ntasks::Integer @@ -78,141 +88,180 @@ function Distributed.launch( exehome = params[:dir] exename = params[:exename] exeflags = params[:exeflags] + env = Dict{String, String}(params[:env]) + + # Taken from Distributed.LocalManager + propagate_env_vars!(env) + + worker_args = parse_worker_params(params) + # Get job file location from parameter dictionary + job_directory = setup_job_directory(exehome, params) + + output_path_func = configure_output_args!(job_directory, worker_args) + + ntasks = sm.ntasks + jobname = worker_jobname() + srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(worker_args) $exename $exeflags $(worker_arg())` + + @info "Starting SLURM job $jobname: $srun_cmd" + srun_pid = open(addenv(srun_cmd, env)) + + # Wait for workers to start + t_start = time() + for i in 0:(ntasks - 1) + output_file = output_path_func(lpad(i, 4, "0")) + worker_config = poll_worker_startup(output_file, i, t_start, srun_pid) + # Add configs to `instances_arr` for internal Distributed use + push!(instances_arr, worker_config) + notify(c) + end +end + +""" + parse_worker_params(params::Dict) - # TODO: Inherit full env - exeflags = exeflags == `` ? "--project=$(project_dir())" : exeflags +Parse params into string arguments for the worker launch command. +Uses all keys that are not in `Distributed.default_addprocs_params()`. +""" +function parse_worker_params(params::Dict) stdkeys = keys(Distributed.default_addprocs_params()) - slurm_params = + worker_params = filter(x -> (!(x[1] in stdkeys) && x[1] != :job_file_loc), params) - srunargs = [] + worker_args = [] - for (k, v) in slurm_params + for (k, v) in worker_params if length(string(k)) == 1 - push!(srunargs, "-$k") + push!(worker_args, "-$k") if length(v) > 0 - push!(srunargs, v) + push!(worker_args, v) end else k2 = replace(string(k), "_" => "-") if length(v) > 0 - push!(srunargs, "--$k2=$v)") + push!(worker_args, "--$k2=$v)") else - push!(srunargs, "--$k2") + push!(worker_args, "--$k2") end end end + return worker_args +end - # Get job file location from parameter dictionary - job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) - # Make directory if not already made - if !isdir(job_file_loc) - mkdir(job_file_loc) - end - # Check for given output file name - jobname = "julia-$(getpid())" +worker_jobname() = "julia-$(getpid())" - default_template = ".$jobname-$(trunc(Int, Base.time() * 10))" - default_output(x) = joinpath(job_file_loc, "$default_template-$x.out") +function configure_output_args!(job_directory, worker_args) + default_template = ".$(worker_jobname())-$(trunc(Int, Base.time() * 10))" + default_output(x) = joinpath(job_directory, "$default_template-$x.out") - # Set output name - has_output_name = - any(arg -> occursin("-o", arg) || occursin("--output", arg), srunargs) - if has_output_name + if any(arg -> occursin("-o", arg) || occursin("--output", arg), worker_args) # if has_output_name, ensure there is only one output arg locs = findall( x -> startswith(x, "-o") || startswith(x, "--output"), - srunargs, + worker_args, ) length(locs) > 1 && - error("Slurm Error: Multiple output files specified: $srunargs") - job_output_file = srunargs[locs[1] + 1] + error("Slurm Error: Multiple output files specified: $worker_args") + job_output_file = worker_args[locs[1] + 1] + return i -> job_output_file else # Slurm interpolates %4t to the task ID padded with up to four zeros - push!(srunargs, "-o", default_output("%4t")) + push!(worker_args, "-o", default_output("%4t")) + return default_output end +end - ntasks = sm.ntasks - srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` +function setup_job_directory(exehome::String, params::Dict) + job_directory = joinpath(exehome, get(params, :job_file_loc, ".")) + !isdir(job_directory) && mkdir(job_directory) + return job_directory +end - @info "Starting SLURM job $jobname: $srun_cmd" - srun_proc = open(srun_cmd) +function propagate_env_vars!(env) + if get(env, "JULIA_LOAD_PATH", nothing) === nothing + env["JULIA_LOAD_PATH"] = join(LOAD_PATH, ":") + end + if get(env, "JULIA_DEPOT_PATH", nothing) === nothing + env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, ":") + end + project = Base.ACTIVE_PROJECT[] + if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing + env["JULIA_PROJECT"] = project + end +end + +function poll_worker_startup( + job_output_file::String, + worker_index::Int, + t_start::Float64, + pid, +) # This Regex will match the worker's socket and IP address # Example: julia_worker:9015#169.254.3.1 - slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" + julia_worker_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" could_not_connect_regex = r"could not connect" exiting_regex = r"exiting." - - # Wait for workers to start - t_start = time() - t_waited = round(Int, time() - t_start) + worker_launch_details = nothing + worker_errors = String[] retry_delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1) - for i in 0:(ntasks - 1) - slurm_spec_match = nothing - worker_errors = String[] - if !has_output_name - job_output_file = default_output(lpad(i, 4, "0")) - end - for retry_delay in push!(collect(retry_delays), 0) - t_waited = round(Int, time() - t_start) - - # Wait for output log to be created and populated, then parse - if isfile(job_output_file) - if filesize(job_output_file) > 0 - open(job_output_file) do f - # Due to error and warning messages, we need to check - # for a regex match on each line - for line in eachline(f) - re_match = match(slurm_spec_regex, line) - if !isnothing(re_match) - slurm_spec_match = re_match - break # We have found the match - end - for expr in [could_not_connect_regex, exiting_regex] - if !isnothing(match(expr, line)) - slurm_spec_match = nothing - push!(worker_errors, line) - end + t_waited = nothing + for retry_delay in push!(collect(retry_delays), 0) + t_waited = round(Int, time() - t_start) + + # Wait for output log to be created and populated, then parse + if isfile(job_output_file) + if filesize(job_output_file) > 0 + open(job_output_file) do f + # Due to error and warning messages, we need to check + # for a regex match on each line + for line in eachline(f) + re_match = match(julia_worker_regex, line) + if !isnothing(re_match) + worker_launch_details = re_match + break # We have found the match + end + for expr in [could_not_connect_regex, exiting_regex] + if !isnothing(match(expr, line)) + worker_launch_details = nothing + push!(worker_errors, line) end end end end - if !isempty(worker_errors) || !isnothing(slurm_spec_match) - break # break if error or specification found - else - @info "Worker $i (after $t_waited s): Output file found, but no connection details yet" - end + end + if !isempty(worker_errors) || !isnothing(worker_launch_details) + break # break if error or specification found else - @info "Worker $i (after $t_waited s): No output file \"$job_output_file\" yet" + @info "Worker $worker_index (after $t_waited s): Output file found, but no connection details yet" end - - # Sleep for some time to limit resource usage while waiting for the job to start - sleep(retry_delay) + else + @info "Worker $worker_index (after $t_waited s): No output file \"$job_output_file\" yet" end - if !isempty(worker_errors) - throw( - ErrorException( - "Worker $i failed after $t_waited s: $(join(worker_errors, " "))", - ), - ) - elseif isnothing(slurm_spec_match) - throw( - ErrorException( - "Timeout after $t_waited s while waiting for worker $i to get ready.", - ), - ) - end + # Sleep for some time to limit resource usage while waiting for the job to start + sleep(retry_delay) + end - config = WorkerConfig() - config.port = parse(Int, slurm_spec_match[2]) - config.host = strip(slurm_spec_match[3]) - @info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)" - # Keep a reference to the proc, so it's properly closed once - # the last worker exits. - config.userdata = srun_proc - push!(instances_arr, config) - notify(c) + if !isempty(worker_errors) + throw( + ErrorException( + "Worker $worker_index failed after $t_waited s: $(join(worker_errors, " "))", + ), + ) + elseif isnothing(worker_launch_details) + throw( + ErrorException( + "Timeout after $t_waited s while waiting for worker $worker_index to get ready.", + ), + ) end + config = WorkerConfig() + config.port = parse(Int, worker_launch_details[2]) + config.host = strip(worker_launch_details[3]) + config.userdata = pid + # Keep a reference to the proc, so it's properly closed once + # the last worker exits. + @info "Worker $worker_index ready after $t_waited s on host $(config.host), port $(config.port)" + return config end diff --git a/test/hpc_backend.jl b/test/hpc_backend.jl index d625797a..30282447 100644 --- a/test/hpc_backend.jl +++ b/test/hpc_backend.jl @@ -25,3 +25,15 @@ julia_eki = calibrate(JuliaBackend, experiment_config) test_sf_calibration_output(julia_eki, prior, experiment_config.observations) compare_g_ensemble(eki, julia_eki) + +theta_star_vec = + (; coefficient_a_m_businger = 4.7, coefficient_a_h_businger = 4.7) + +convergence_plot( + eki, + prior, + theta_star_vec, + ["coefficient_a_m_businger", "coefficient_a_h_businger"], +) + +g_vs_iter_plot(eki) diff --git a/test/julia_backend.jl b/test/julia_backend.jl index 3cf2031a..bb7e00a8 100644 --- a/test/julia_backend.jl +++ b/test/julia_backend.jl @@ -5,14 +5,7 @@ using EnsembleKalmanProcesses.ParameterDistributions using EnsembleKalmanProcesses.TOMLInterface import ClimaParams as CP -import ClimaCalibrate: - forward_model, - JuliaBackend, - ExperimentConfig, - calibrate, - project_dir, - observation_map - +import ClimaCalibrate as CAL import JLD2 # Experiment Info @@ -24,7 +17,7 @@ observations = [20.0] noise = [0.01;;] output_dir = mktempdir() -experiment_config = ExperimentConfig( +experiment_config = CAL.ExperimentConfig( n_iterations, ensemble_size, observations, @@ -36,15 +29,15 @@ experiment_config = ExperimentConfig( # Model interface # This "model" just samples parameters and returns them, we are checking that the # results are reproducible. -function forward_model(iteration, member) - member_path = path_to_ensemble_member(output_dir, iteration, member) - parameter_path = joinpath(member_path, "parameters.toml") - toml_dict = CP.create_toml_dict(Float64; override_file = parameter_path) +function CAL.forward_model(iteration, member) + member_path = CAL.path_to_ensemble_member(output_dir, iteration, member) + param_path = CAL.parameter_path(output_dir, iteration, member) + toml_dict = CP.create_toml_dict(Float64; override_file = param_path) (; test_param) = CP.get_parameter_values(toml_dict, "test_param") JLD2.save_object(joinpath(member_path, output_file), test_param) end -function observation_map(iteration) +function CAL.observation_map(iteration) (; ensemble_size) = experiment_config dims = 1 G_ensemble = Array{Float64}(undef, dims..., ensemble_size) @@ -58,7 +51,7 @@ function observation_map(iteration) end # Test! -ekp = calibrate(JuliaBackend, experiment_config) +ekp = CAL.calibrate(CAL.JuliaBackend, experiment_config) @testset "Test end-to-end calibration" begin parameter_values = diff --git a/test/sf_calibration_utils.jl b/test/sf_calibration_utils.jl index 0c82f299..089b1f5c 100644 --- a/test/sf_calibration_utils.jl +++ b/test/sf_calibration_utils.jl @@ -169,8 +169,7 @@ function g_vs_iter_plot(eki) model_config["toml"] = [ joinpath( pkg_dir, - ClimaCalibrate.path_to_ensemble_member(output_dir, iter, i), - "parameters.toml", + ClimaCalibrate.parameter_path(output_dir, iter, i), ), ] ustar_mod =