You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
There is a memory leak in the run_estimation function. It appears to happen in the for loop over different cells. Freeing memory by deleting the objects that are created within this loop did not help. Up to 100 mb is stored additionally for each iteration of the loop. This is a bug because nothing needs persist from one instance to the other.
Some further research indicated that this problem might be rooted in the Theano package [1, 2]. That might also be the reason it did not come up in the updatet PYMC version.
An open question is why this just came up now and not in earlier runs of the ATTRICI-PYMC3 code. I believe the reason is that we did not run so many cells within a single cell such that memory was never filled.
A quick fix is to not run too many cells within one node. However this is not a permanent solution, because it is quite likely that others who want to use ATTRICI, run into the same problem. Likely they do not have the same setup with a "standby queue" where parallelization on many independent jobs is possible.
A proper solution might be to setup the pymc model only once and reuse it for the next instances [see discussion in 1]. This might also give a considerable improvement in performance because the model does not need to be recompiled.
Maybe this is also why the code is faster in the updated pymc because the compilation step is reduced. But this is just speculative for now.
I add the output of memory_profiler below
1111125.6MiB0.0MiB11forninrun_numbers[:]:
112# todo only for debugging 1131125.6MiB0.0MiB11if (n-start_num) >=10:
1141125.6MiB0.0MiB1break1151161036.7MiB0.0MiB10estimator=est.estimator(s)
1171036.7MiB4.5MiB10sp=df_specs.loc[n, :]
118119# if lat >20: continue 1201036.7MiB0.0MiB10print(
1211036.7MiB0.0MiB10"This is SLURM task",
1221036.7MiB0.0MiB10task_id,
1231036.7MiB0.0MiB10"run number",
1241036.7MiB0.0MiB10n,
1251036.7MiB0.0MiB10"lat,lon",
1261036.7MiB0.0MiB10sp["lat"],
1271036.7MiB0.0MiB10sp["lon"],
128 )
1291036.7MiB0.0MiB10print(f"Memory usage at start: {memory_usage(-1, interval=0.1, timeout=1)} MiB")
1301036.7MiB0.0MiB10outdir_for_cell=dh.make_cell_output_dir(
1311036.7MiB0.0MiB10s.output_dir, "timeseries", sp["lat"], sp["lon"], s.variable132 )
1331036.7MiB0.0MiB10fname_cell=dh.get_cell_filename(outdir_for_cell, sp["lat"], sp["lon"], s)
1341351036.7MiB0.0MiB10ifs.skip_if_data_exists:
136try:
137dh.test_if_data_valid_exists(fname_cell)
138print(f"Existing valid data in {fname_cell} . Skip calculation.")
139continue140exceptExceptionase:
141print(e)
142print("No valid data found. Run calculation.")
1431441036.7MiB0.0MiB10data=obs_data.variables[s.variable][:, sp["index_lat"], sp["index_lon"]]
1451036.7MiB0.0MiB10df, datamin, scale=dh.create_dataframe(
1461036.7MiB8.2MiB10nct[:], nct.units, data, gmt, s.variable147 )
1481491036.7MiB0.0MiB10try:
1501036.7MiB0.0MiB10print(
1511036.7MiB0.0MiB10f"took {(datetime.now() -TIME0).total_seconds()} seconds till estimator.estimate_parameters is started"152 )
1531036.7MiB0.0MiB10trace, dff=func_timeout(
1541036.7MiB0.0MiB10s.timeout,
1551036.7MiB0.0MiB10estimator.estimate_parameters,
1561114.9MiB960.6MiB10args=(df, sp["lat"], sp["lon"], s.map_estimate, TIME0),
157 )
158except (FunctionTimedOut, ValueError) aserror:
159raiseerror160# if str(error) == "Modes larger 1 are not allowed for the censored model.":161# raise error162# else:163# print("Sampling at", sp["lat"], sp["lon"], " timed out or failed.")164# print(error)165# logger.error(166# str(167# "lat,lon: "168# + str(sp["lat"])169# + " "170# + str(sp["lon"])171# + " : "172# + str(error)173# )174# )175# continue1761771114.9MiB0.0MiB10df_with_cfact=estimator.estimate_timeseries(
1781225.6MiB894.7MiB10dff, trace, datamin, scale, s.map_estimate179 )
1801225.6MiB0.0MiB10dh.save_to_disk(
1811225.6MiB2.1MiB10df_with_cfact, fname_cell, sp["lat"], sp["lon"], s.storage_format182 )
1831225.6MiB0.0MiB10df=None1841125.6MiB-930.7MiB10dff=None1851861125.6MiB0.0MiB10print(f"Memory usage at end: {memory_usage(-1, interval=0.1, timeout=1)} MiB")
1871881125.6MiB0.0MiB1obs_data.close()
1891125.6MiB0.0MiB1nc_lsmask.close()
1901125.6MiB0.0MiB1print(
1911125.6MiB0.0MiB1"Estimation completed for all cells. It took {0:.1f} minutes.".format(
1921125.6MiB0.0MiB1 (datetime.now() -TIME0).total_seconds() /60193 )
194 )
Bellow is the run_estimation.py code I used for profiling
importloggingimportosfromdatetimeimportdatetimefrompathlibimportPathimportattriciimportattrici.datahandlerasdhimportattrici.estimatorasestimportnetCDF4asncimportnumpyasnpimportpandasaspdfromfunc_timeoutimportFunctionTimedOut, func_timeoutfrommemory_profilerimportmemory_usageimportsettingsassprint("memory usage at the beginning: {memory_usage(-1, interval=0.1, timeout=1)} MiB")
s.output_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=s.output_dir/"failing_cells.log",
level=logging.ERROR,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger=logging.getLogger(__name__)
# needed to silence verbose pymc3pmlogger=logging.getLogger("pymc3")
pmlogger.propagate=Falseprint("Version", attrici.__version__)
try:
submitted=os.environ["SUBMITTED"] =="1"task_id=int(os.environ["SLURM_ARRAY_TASK_ID"])
njobarray=int(os.environ["SLURM_ARRAY_TASK_COUNT"])
s.ncores_per_job=1s.progressbar=FalseexceptKeyError:
submitted=Falsenjobarray=1task_id=0s.progressbar=Truedh.create_output_dirs(s.output_dir)
gmt_file=s.input_dir/s.gmt_filencg=nc.Dataset(gmt_file, "r")
gmt=np.squeeze(ncg.variables["tas"][:])
ncg.close()
input_file=s.input_dir/s.source_filelandsea_mask_file=s.input_dir/s.landsea_fileobs_data=nc.Dataset(input_file, "r")
nc_lsmask=nc.Dataset(landsea_mask_file, "r")
nct=obs_data.variables["time"]
lats=obs_data.variables["lat"][:]
lons=obs_data.variables["lon"][:]
longrid, latgrid=np.meshgrid(lons, lats)
jgrid, igrid=np.meshgrid(np.arange(len(lons)), np.arange(len(lats)))
ls_mask=nc_lsmask.variables["mask"][:, :]
df_specs=pd.DataFrame()
df_specs["lat"] =latgrid[ls_mask==1]
df_specs["lon"] =longrid[ls_mask==1]
df_specs["index_lat"] =igrid[ls_mask==1]
df_specs["index_lon"] =jgrid[ls_mask==1]
print("A total of", len(df_specs), "grid cells to estimate.")
iflen(df_specs) % (njobarray) ==0:
print("Grid cells can be equally distributed to Slurm tasks")
calls_per_arrayjob=np.ones(njobarray) *len(df_specs) // (njobarray)
else:
print("Slurm tasks not a divisor of number of grid cells, discard some cores.")
calls_per_arrayjob=np.ones(njobarray) *len(df_specs) // (njobarray) +1discarded_jobs=np.where(np.cumsum(calls_per_arrayjob) >len(df_specs))
calls_per_arrayjob[discarded_jobs] =0calls_per_arrayjob[discarded_jobs[0][0]] =len(df_specs) -calls_per_arrayjob.sum()
assertcalls_per_arrayjob.sum() ==len(df_specs)
# print(calls_per_arrayjob)# Calculate the starting and ending values for this task based# on the SLURM task and the number of runs per task.cum_calls_per_arrayjob=calls_per_arrayjob.cumsum(dtype=int)
start_num=0iftask_id==0elsecum_calls_per_arrayjob[task_id-1]
end_num=cum_calls_per_arrayjob[task_id] -1run_numbers=np.arange(start_num, end_num+1, 1, dtype=int)
iflen(run_numbers) ==0:
print("No runs assigned for this SLURM task.")
else:
print("This is SLURM task", task_id, "which will do runs", start_num, "to", end_num)
estimator=est.estimator(s)
TIME0=datetime.now()
forninrun_numbers[:]:
sp=df_specs.loc[n, :]
# if lat >20: continueprint(
"This is SLURM task", task_id, "run number", n, "lat,lon", sp["lat"], sp["lon"]
)
print("Memory usage at start: {memory_usage(-1, interval=0.1, timeout=1)} MiB")
outdir_for_cell=dh.make_cell_output_dir(
s.output_dir, "timeseries", sp["lat"], sp["lon"], s.variable
)
fname_cell=dh.get_cell_filename(outdir_for_cell, sp["lat"], sp["lon"], s)
ifs.skip_if_data_exists:
try:
dh.test_if_data_valid_exists(fname_cell)
print(f"Existing valid data in {fname_cell} . Skip calculation.")
continueexceptExceptionase:
print(e)
print("No valid data found. Run calculation.")
data=obs_data.variables[s.variable][:, sp["index_lat"], sp["index_lon"]]
df, datamin, scale=dh.create_dataframe(nct[:], nct.units, data, gmt, s.variable)
try:
print(
f"took {(datetime.now() -TIME0).total_seconds()} seconds till estimator.estimate_parameters is started"
)
trace, dff=func_timeout(
s.timeout,
estimator.estimate_parameters,
args=(df, sp["lat"], sp["lon"], s.map_estimate, TIME0),
)
except (FunctionTimedOut, ValueError) aserror:
raiseerror# if str(error) == "Modes larger 1 are not allowed for the censored model.":# raise error# else:# print("Sampling at", sp["lat"], sp["lon"], " timed out or failed.")# print(error)# logger.error(# str(# "lat,lon: "# + str(sp["lat"])# + " "# + str(sp["lon"])# + " : "# + str(error)# )# )# continuedf_with_cfact=estimator.estimate_timeseries(
dff, trace, datamin, scale, s.map_estimate
)
dh.save_to_disk(df_with_cfact, fname_cell, sp["lat"], sp["lon"], s.storage_format)
print("Memory usage at end: {memory_usage(-1, interval=0.1, timeout=1)} MiB")
obs_data.close()
nc_lsmask.close()
print(
"Estimation completed for all cells. It took {0:.1f} minutes.".format(
(datetime.now() -TIME0).total_seconds() /60
)
)
The text was updated successfully, but these errors were encountered:
There is a memory leak in the run_estimation function. It appears to happen in the for loop over different cells. Freeing memory by deleting the objects that are created within this loop did not help. Up to 100 mb is stored additionally for each iteration of the loop. This is a bug because nothing needs persist from one instance to the other.
Some further research indicated that this problem might be rooted in the Theano package [1, 2]. That might also be the reason it did not come up in the updatet PYMC version.
An open question is why this just came up now and not in earlier runs of the ATTRICI-PYMC3 code. I believe the reason is that we did not run so many cells within a single cell such that memory was never filled.
A quick fix is to not run too many cells within one node. However this is not a permanent solution, because it is quite likely that others who want to use ATTRICI, run into the same problem. Likely they do not have the same setup with a "standby queue" where parallelization on many independent jobs is possible.
A proper solution might be to setup the pymc model only once and reuse it for the next instances [see discussion in 1]. This might also give a considerable improvement in performance because the model does not need to be recompiled.
Maybe this is also why the code is faster in the updated pymc because the compilation step is reduced. But this is just speculative for now.
I add the output of memory_profiler below
Bellow is the run_estimation.py code I used for profiling
The text was updated successfully, but these errors were encountered: