Skip to content

Commit 1c6a598

Browse files
authored
Merge pull request #312 from LSSTDESC/multiproc_fixes
Fix a number of problems when running in multiprocessing mode
2 parents fbe45be + 44e46b1 commit 1c6a598

18 files changed

+394
-158
lines changed

config/imsim-config.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ input:
8181
kcrit: 0.2 # in units of 1/r0
8282
screen_size: 409.6 # Default=812.2, which takes a lot of memory, so use this for testing.
8383
screen_scale: 0.1 # meters
84-
doOpt: True
84+
doOpt: False
8585
nproc: 1 # Default (None) means one proc per screen.
8686

8787
# TODO:

imsim/atmPSF.py

+113-104
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
GalSim realistic atmospheric PSF class
33
"""
44

5+
import os
56
import multiprocessing
7+
import logging
68
import numpy as np
79
from scipy.optimize import bisect
8-
910
import pickle
1011
import galsim
12+
from galsim.config import InputLoader, RegisterInputType, RegisterObjectType
1113

1214
from .optical_system import OpticalZernikes, mock_deviations
1315

@@ -23,18 +25,12 @@ def save_psf(psf, outfile):
2325
with galsim.utilities.pickle_shared():
2426
pickle.dump(psf, output)
2527

26-
def load_psf(psf_file, log_level='INFO'):
28+
def load_psf(psf_file):
2729
"""
2830
Load a psf from a pickle file.
2931
"""
3032
with open(psf_file, 'rb') as fd:
3133
psf = pickle.load(fd)
32-
33-
# Since save_psf sets any logger attribute to None, restore
34-
# it here.
35-
if hasattr(psf, 'logger'):
36-
psf.logger = get_logger(log_level, 'psf')
37-
3834
return psf
3935

4036

@@ -93,32 +89,31 @@ class AtmosphericPSF(object):
9389
@param airmass Airmass of observation
9490
@param rawSeeing The wavelength=500nm, zenith FWHM of the seeing
9591
@param band One of ['u','g','r','i','z','y']
92+
@param boresight The CelestialCoord of the boresight of the observation.
9693
@param rng galsim.BaseDeviate
9794
@param t0 Exposure time start in seconds. default: 0.
9895
@param exptime Exposure time in seconds. default: 30.
9996
@param kcrit Critical Fourier mode at which to split first and second kicks
10097
in units of (1/r0). default: 0.2
101-
@param gaussianFWHM FWHM of additional Gaussian profile by which to convolve.
102-
This is useful to represent contributions of physical effects
103-
not otherwise explicitly modeled. The default value of 0.3
104-
arcsec is set assuming that doOpt=True and that the sensor
105-
physics is enabled. If this is not the case, then it may be
106-
appropriate to increase this value to account for the missing
107-
contribution of these effects.
10898
@param screen_size Size of the phase screens in meters. default: 819.2
10999
@param screen_scale Size of phase screen "pixels" in meters. default: 0.1
110100
@param doOpt Add in optical phase screens? default: True
111101
@param logger Optional logger. default: None
112102
@param nproc Number of processes to use in creating screens. If None (default),
113103
then allocate one process per phase screen, of which there are 6,
114104
nominally.
105+
@param save_file A file name to use for saving the built atmosphere. If the file already
106+
exists, then the atmosphere is read in from this file, rather than be
107+
rebuilt. [default: None]
115108
"""
116-
def __init__(self, airmass, rawSeeing, band, rng,
117-
t0=0.0, exptime=30.0, kcrit=0.2, gaussianFWHM=0.3,
109+
def __init__(self, airmass, rawSeeing, band, boresight, rng,
110+
t0=0.0, exptime=30.0, kcrit=0.2,
118111
screen_size=819.2, screen_scale=0.1, doOpt=True, logger=None,
119-
nproc=None):
112+
nproc=None, save_file=None):
120113
self.airmass = airmass
121114
self.rawSeeing = rawSeeing
115+
self.boresight = boresight
116+
self.logger = galsim.config.LoggerWrapper(logger)
122117

123118
self.wlen_eff = dict(u=365.49, g=480.03, r=622.20, i=754.06, z=868.21, y=991.66)[band]
124119
# wlen_eff is from Table 2 of LSE-40 (y=y2)
@@ -130,7 +125,33 @@ def __init__(self, airmass, rawSeeing, band, rng,
130125
self.exptime = exptime
131126
self.screen_size = screen_size
132127
self.screen_scale = screen_scale
133-
self.logger = logger
128+
129+
if save_file and os.path.isfile(save_file):
130+
self.logger.warning(f'Reading atmospheric PSF from {save_file}')
131+
self.load_psf(save_file)
132+
else:
133+
self.logger.warning('Buidling atmospheric PSF')
134+
self._build_atm(kcrit, doOpt, nproc)
135+
if save_file:
136+
self.logger.warning(f'Saving atmospheric PSF to {save_file}')
137+
self.save_psf(save_file)
138+
139+
def save_psf(self, save_file):
140+
"""
141+
Save the psf as a pickle file.
142+
"""
143+
with open(save_file, 'wb') as fd:
144+
with galsim.utilities.pickle_shared():
145+
pickle.dump((self.atm, self.aper), fd)
146+
147+
def load_psf(self, save_file):
148+
"""
149+
Load a psf from a pickle file.
150+
"""
151+
with open(save_file, 'rb') as fd:
152+
self.atm, self.aper = pickle.load(fd)
153+
154+
def _build_atm(self, kcrit, doOpt, nproc):
134155

135156
ctx = multiprocessing.get_context('fork')
136157
self.atm = galsim.Atmosphere(mp_context=ctx, **self._getAtmKwargs())
@@ -142,31 +163,26 @@ def __init__(self, airmass, rawSeeing, band, rng,
142163
r0_500 = self.atm.r0_500_effective
143164
r0 = r0_500 * (self.wlen_eff/500.0)**(6./5)
144165
kmax = kcrit / r0
145-
if logger:
146-
logger.warning("Building atmosphere")
166+
self.logger.info("Instantiating atmospheric screens")
147167

148168
if nproc is None:
149169
nproc = len(self.atm)
150170

151171
if nproc == 1:
152172
self.atm.instantiate(kmax=kmax, check='phot')
153173
else:
154-
if logger:
155-
logger.warning(f"Using {nproc} processes to build the phase screens")
174+
self.logger.warning(f"Using {nproc} processes to build the phase screens")
156175
with galsim.utilities.single_threaded():
157176
with ctx.Pool(nproc, initializer=galsim.phase_screens.initWorker,
158177
initargs=galsim.phase_screens.initWorkerArgs()) as pool:
159178
self.atm.instantiate(pool=pool, kmax=kmax, check='phot')
160179

161-
if logger:
162-
logger.info("Finished building atmosphere")
180+
self.logger.info("Finished building atmosphere")
181+
self.logger.debug("GSScreenShare keys = %s",list(galsim.phase_screens._GSScreenShare.keys()))
182+
self.logger.debug("id(self) = %s",id(self))
163183

164184
if doOpt:
165-
self.atm.append(OptWF(rng, self.wlen_eff))
166-
167-
if logger and gaussianFWHM > 0:
168-
logger.debug("Convolving in Gaussian with FWHM = {}".format(gaussianFWHM))
169-
self.gaussianFWHM = gaussianFWHM
185+
self.atm.append(OptWF(self.rng, self.wlen_eff))
170186

171187
def __eq__(self, rhs):
172188
return (isinstance(rhs, AtmosphericPSF)
@@ -176,8 +192,7 @@ def __eq__(self, rhs):
176192
and self.t0 == rhs.t0
177193
and self.exptime == rhs.exptime
178194
and self.atm == rhs.atm
179-
and self.aper == rhs.aper
180-
and self.gaussianFWHM == rhs.gaussianFWHM)
195+
and self.aper == rhs.aper)
181196

182197
@staticmethod
183198
def _vkSeeing(r0_500, wavelength, L0):
@@ -258,102 +273,97 @@ def _getAtmKwargs(self):
258273
altitude=altitudes, r0_weights=weights, rng=self.rng,
259274
screen_size=self.screen_size, screen_scale=self.screen_scale)
260275

261-
def _getPSF(self, xPupil=None, yPupil=None, gsparams=None):
276+
def getPSF(self, field_pos, gsparams=None):
262277
"""
263278
Return a PSF to be convolved with sources.
264279
265-
@param [in] xPupil the x coordinate on the pupil in arc seconds
266-
267-
@param [in] yPupil the y coordinate on the pupil in arc seconds
280+
@param [in] field position of the object relative to the boresight direction.
268281
"""
269-
theta = (xPupil*galsim.arcsec, yPupil*galsim.arcsec)
282+
283+
theta = (field_pos.x*galsim.arcsec, field_pos.y*galsim.arcsec)
270284
psf = self.atm.makePSF(
271285
self.wlen_eff,
272286
aper=self.aper,
273287
theta=theta,
274288
t0=self.t0,
275289
exptime=self.exptime,
276290
gsparams=gsparams)
277-
if self.gaussianFWHM > 0.0:
278-
psf = galsim.Convolve(
279-
galsim.Gaussian(fwhm=self.gaussianFWHM, gsparams=gsparams),
280-
psf
281-
)
282291
return psf
283292

293+
class AtmLoader(InputLoader):
294+
"""Custom AtmosphericPSF loader that only loads the atmosphere once per exposure.
284295
285-
# Note: Except for the PsfBase base class, Everything above is identical to what is in the main
286-
# imsim repo. Below is all the new stuff.
287-
288-
# Note: This could be refactored to avoid this bit of indirection. This class is a very
289-
# thin wrapper around the above AtmosphericPSF class, just adjusting for a couple differences
290-
# I wanted to make.
291-
class AtmosphericPSFBuilder(object):
296+
Note: For now, this just loads the atmosphere once for an entire imsim run.
297+
If we ever decide we want to have a single config processing run handle multiple
298+
exposures (rather than just multiple CCDs for a single exposure), we'll need to
299+
reconsider this implementation.
292300
"""
293-
@param airmass Airmass of observation
294-
@param rawSeeing The wavelength=500nm, zenith FWHM of the seeing
295-
@param band One of ['u','g','r','i','z','y']
296-
@param boresight The CelestialCoord of the boresight of the observation.
297-
@param t0 Exposure time start in seconds. default: 0.
298-
@param exptime Exposure time in seconds. default: 30.
299-
@param kcrit Critical Fourier mode at which to split first and second kicks
300-
in units of (1/r0). default: 0.2
301-
@param screen_size Size of the phase screens in meters. default: 819.2
302-
@param screen_scale Size of phase screen "pixels" in meters. default: 0.1
303-
@param doOpt Add in optical phase screens? default: True
304-
@param nproc Number of processes to use in creating screens. If None (default),
305-
then allocate one process per phase screen, of which there are 6,
306-
nominally.
307-
@param save_file Not Implemented (TODO).
308-
@param logger Optional logger. default: None
309-
"""
310-
_req_params = { 'airmass' : float,
311-
'rawSeeing' : float,
312-
'band' : str,
313-
'boresight' : galsim.CelestialCoord,
314-
}
315-
_opt_params = { 't0' : float,
316-
'exptime' : float,
317-
'kcrit' : float,
318-
'screen_size' : float,
319-
'screen_scale' : float,
320-
'doOpt' : bool,
321-
'nproc' : int,
322-
'save_file' : str,
323-
}
324-
_takes_rng = True
325-
326-
def __init__(self, airmass, rawSeeing, band, boresight, rng,
327-
t0=0, exptime=30, kcrit=0.2, screen_size=819.2, screen_scale=0.1, doOpt=True,
328-
nproc=None, save_file=None, logger=None):
329-
# Note: one change from the above is that we don't do the gaussian part, so set that to 0.
330-
# Instead, the user can choose to convolve this by a Gaussian in the config file.
331-
self.atm = AtmosphericPSF(airmass, rawSeeing, band, rng,
332-
t0=t0, exptime=exptime, kcrit=kcrit, gaussianFWHM=0.,
333-
screen_size=screen_size, screen_scale=screen_scale,
334-
doOpt=doOpt, logger=logger, nproc=nproc)
335-
# The other change is that we need the boresight to do image_pos -> field_pos
336-
# I think it makes sense to take that as an input here rather than in the
337-
# PSF object each time (since it's the same for the whole observation).
338-
self.boresight = boresight
339-
340-
def getBoresight(self):
341-
return self.boresight
342-
343-
def getPSF(self, field_pos, gsparams):
344-
return self.atm._getPSF(field_pos.x, field_pos.y, gsparams)
301+
def getKwargs(self, config, base, logger):
302+
logger.debug("Get kwargs for AtmosphericPSF")
303+
304+
req_params = { 'airmass' : float,
305+
'rawSeeing' : float,
306+
'band' : str,
307+
'boresight' : galsim.CelestialCoord,
308+
}
309+
opt_params = { 't0' : float,
310+
'exptime' : float,
311+
'kcrit' : float,
312+
'screen_size' : float,
313+
'screen_scale' : float,
314+
'doOpt' : bool,
315+
'nproc' : int,
316+
'save_file' : str,
317+
}
318+
kwargs, _ = galsim.config.GetAllParams(config, base, req=req_params, opt=opt_params)
319+
logger.debug("kwargs = %s",kwargs)
320+
321+
# We want this to be set up right at the beginning of the run, before the config
322+
# stuff has even set up the RNG yet. So make an rng ourselves based on the
323+
# random seed in image.random_seed.
324+
325+
seed = base['image'].get('random_seed', None)
326+
if seed is None:
327+
raise RuntimeError("AtmLoader requires a seed in config['image']['random_seed']")
328+
if isinstance(seed, list):
329+
seed = seed[0] # If random_seed is a list, just use the first one.
330+
# Parse the value in case it is an eval or something.
331+
seed = galsim.config.ParseValue({'seed': seed}, 'seed', base, int)[0]
332+
# Somewhat gratuitously add an aribtary value to this to avoid any correlations with
333+
# other uses of this random seed elsewhere in the config processing.
334+
seed += 271828
335+
rng = galsim.BaseDeviate(seed)
336+
kwargs['rng'] = rng
337+
logger.debug("seed for atm = %s",seed)
338+
339+
# Include the logger
340+
kwargs['logger'] = logger
341+
342+
# safe=True means this will be used for the whole run.
343+
#safe = True
344+
# TODO: However, this isn't working yet, since the multiple processes aren't getting the
345+
# GSScreenShare dict updated properly. For now, we rely on the base process making the
346+
# atmosphere if necessary and saving it to save_file. Then other processes will
347+
# load it in. This happens if safe=False.
348+
safe=False
349+
350+
return kwargs, safe
345351

346352

347353
def BuildAtmosphericPSF(config, base, ignore, gsparams, logger):
348354
"""Build an AtmosphericPSF from the information in the config file.
349355
"""
350356
atm = galsim.config.GetInputObj('atm_psf', config, base, 'AtmosphericPSF')
351357
image_pos = base['image_pos']
352-
boresight = atm.getBoresight()
358+
boresight = atm.boresight
353359
field_pos = base['wcs'].posToWorld(image_pos, project_center=boresight)
354360
if gsparams: gsparams = GSParams(**gsparams)
355361
else: gsparams = None
356362

363+
#logger.debug("Making PSF for pos %s",image_pos)
364+
#logger.debug("GSScreenShare keys = %s",list(galsim.phase_screens._GSScreenShare.keys()))
365+
#logger.debug("type(atm) = %s",str(type(atm)))
366+
#logger.debug("id(atm) = %s",id(atm))
357367
psf = atm.getPSF(field_pos, gsparams)
358368
return psf, False
359369

@@ -451,8 +461,7 @@ def BuildKolmogorovPSF(config, base, ignore, gsparams, logger):
451461

452462
return psf, safe
453463

454-
from galsim.config import InputLoader, RegisterInputType, RegisterObjectType
455-
RegisterInputType('atm_psf', InputLoader(AtmosphericPSFBuilder, takes_logger=True))
464+
RegisterInputType('atm_psf', AtmLoader(AtmosphericPSF, takes_logger=True))
456465
RegisterObjectType('AtmosphericPSF', BuildAtmosphericPSF, input_type='atm_psf')
457466
RegisterObjectType('DoubleGaussianPSF', BuildDoubleGaussianPSF)
458467
RegisterObjectType('KolmogorovPSF', BuildKolmogorovPSF)

imsim/ccd.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import warnings
23
import galsim
34
from galsim.config import OutputBuilder, RegisterOutputType
45
from .cosmic_rays import CosmicRays
@@ -55,7 +56,8 @@ def setup(self, config, base, file_num, logger):
5556
ccd_orientation = camera[det_name].getOrientation()
5657
if hasattr(ccd_orientation, 'getHeight'):
5758
z_offset = ccd_orientation.getHeight()*1.0e-3 # Convert to meters.
58-
logger.info("Setting CCD z-offset to %.2e m", z_offset)
59+
if z_offset != 0:
60+
logger.info("Setting CCD z-offset to %.2e m", z_offset)
5961
else:
6062
z_offset = 0
6163
if "shift_optics" not in base:
@@ -149,7 +151,10 @@ def buildImages(self, config, base, file_num, image_num, obj_num, ignore, logger
149151
dectel = opsim_md.get('fieldDec', 0.)
150152
image.header['RATEL'] = ratel
151153
image.header['DECTEL'] = dectel
152-
image.header['ROTTELPOS'] = opsim_md.get('rotTelPos', 0.)
154+
with warnings.catch_warnings():
155+
# Silence FITS warning about long header keyword
156+
warnings.simplefilter('ignore')
157+
image.header['ROTTELPOS'] = opsim_md.get('rotTelPos', 0.)
153158
image.header['FILTER'] = opsim_md.get('band')
154159
image.header['CAMERA'] = base['output']['camera']
155160
image.header['HASTART'] = opsim_md.getHourAngle(mjd_obs, ratel)

0 commit comments

Comments
 (0)