Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a number of problems when running in multiprocessing mode #312

Merged
merged 25 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
afad216
seems like doOpt should be False if we are using the lsst_optics phot…
jchiang87 Nov 9, 2022
2c3f753
read/write atm_psf to file if save_file is provided
jchiang87 Nov 9, 2022
0600f87
only print z-offset if it's non-zero
jchiang87 Nov 9, 2022
4495608
Merge branch 'main' into u/jchiang/read_psf_file
jchiang87 Nov 10, 2022
fb2b561
Merge branch 'main' into u/jchiang/read_psf_file
jchiang87 Dec 8, 2022
3402268
Monkey-patch GalSim's InputProxy function
rmjarvis Dec 6, 2022
e549074
Need to use get, not [] for input objects.
rmjarvis Dec 6, 2022
74b3120
Don't use self.logger in TreeRing class.
rmjarvis Dec 6, 2022
7707d2f
Fix logger message for sky level
rmjarvis Dec 7, 2022
e45d915
Remove gaussianFWHM option of AtmosphericPSF
rmjarvis Dec 7, 2022
b42e4d8
Remove AtmosphericPSFBuilder wrapper class
rmjarvis Dec 7, 2022
4945f35
Silence a fits warning about long header keyword
rmjarvis Dec 7, 2022
b12a0fb
Make test files runnable (those that weren't already)
rmjarvis Dec 7, 2022
ee29e55
Don't spend ages running test_wcs_fit in regular unit testing
rmjarvis Dec 7, 2022
851ff2c
Use custom loader for AtmosphericPSF
rmjarvis Dec 8, 2022
4c7ed11
Put back the generic save_psf and load_psf
rmjarvis Dec 8, 2022
fbc111f
save_file is not actually required. Let it be optional.
rmjarvis Dec 8, 2022
a5bf331
Need random_seed for AtmosphericPSF in tests
rmjarvis Dec 8, 2022
5f212a9
Add test of save_file option in AtmosphericPSF
rmjarvis Dec 8, 2022
8406320
Move test in test_atmPSF into test_psf with other psf tests
rmjarvis Dec 8, 2022
260d302
Add test that random_seed is required
rmjarvis Dec 8, 2022
f0a67d5
Test that random_seed can be a list
rmjarvis Dec 8, 2022
c4b7764
Add TODO to remove monkey patch when possible
rmjarvis Dec 9, 2022
7481503
Merge remote-tracking branch 'origin/main' into multiproc_fixes
rmjarvis Dec 9, 2022
44e46b1
Add more tests of sky_gradient including one that would have caught t…
rmjarvis Dec 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/imsim-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ input:
kcrit: 0.2 # in units of 1/r0
screen_size: 409.6 # Default=812.2, which takes a lot of memory, so use this for testing.
screen_scale: 0.1 # meters
doOpt: True
doOpt: False
nproc: 1 # Default (None) means one proc per screen.

# TODO:
Expand Down
217 changes: 113 additions & 104 deletions imsim/atmPSF.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
GalSim realistic atmospheric PSF class
"""

import os
import multiprocessing
import logging
import numpy as np
from scipy.optimize import bisect

import pickle
import galsim
from galsim.config import InputLoader, RegisterInputType, RegisterObjectType

from .optical_system import OpticalZernikes, mock_deviations

Expand All @@ -23,18 +25,12 @@ def save_psf(psf, outfile):
with galsim.utilities.pickle_shared():
pickle.dump(psf, output)

def load_psf(psf_file, log_level='INFO'):
def load_psf(psf_file):
"""
Load a psf from a pickle file.
"""
with open(psf_file, 'rb') as fd:
psf = pickle.load(fd)

# Since save_psf sets any logger attribute to None, restore
# it here.
if hasattr(psf, 'logger'):
psf.logger = get_logger(log_level, 'psf')

return psf


Expand Down Expand Up @@ -93,32 +89,31 @@ class AtmosphericPSF(object):
@param airmass Airmass of observation
@param rawSeeing The wavelength=500nm, zenith FWHM of the seeing
@param band One of ['u','g','r','i','z','y']
@param boresight The CelestialCoord of the boresight of the observation.
@param rng galsim.BaseDeviate
@param t0 Exposure time start in seconds. default: 0.
@param exptime Exposure time in seconds. default: 30.
@param kcrit Critical Fourier mode at which to split first and second kicks
in units of (1/r0). default: 0.2
@param gaussianFWHM FWHM of additional Gaussian profile by which to convolve.
This is useful to represent contributions of physical effects
not otherwise explicitly modeled. The default value of 0.3
arcsec is set assuming that doOpt=True and that the sensor
physics is enabled. If this is not the case, then it may be
appropriate to increase this value to account for the missing
contribution of these effects.
@param screen_size Size of the phase screens in meters. default: 819.2
@param screen_scale Size of phase screen "pixels" in meters. default: 0.1
@param doOpt Add in optical phase screens? default: True
@param logger Optional logger. default: None
@param nproc Number of processes to use in creating screens. If None (default),
then allocate one process per phase screen, of which there are 6,
nominally.
@param save_file A file name to use for saving the built atmosphere. If the file already
exists, then the atmosphere is read in from this file, rather than be
rebuilt. [default: None]
"""
def __init__(self, airmass, rawSeeing, band, rng,
t0=0.0, exptime=30.0, kcrit=0.2, gaussianFWHM=0.3,
def __init__(self, airmass, rawSeeing, band, boresight, rng,
t0=0.0, exptime=30.0, kcrit=0.2,
screen_size=819.2, screen_scale=0.1, doOpt=True, logger=None,
nproc=None):
nproc=None, save_file=None):
self.airmass = airmass
self.rawSeeing = rawSeeing
self.boresight = boresight
self.logger = galsim.config.LoggerWrapper(logger)

self.wlen_eff = dict(u=365.49, g=480.03, r=622.20, i=754.06, z=868.21, y=991.66)[band]
# wlen_eff is from Table 2 of LSE-40 (y=y2)
Expand All @@ -130,7 +125,33 @@ def __init__(self, airmass, rawSeeing, band, rng,
self.exptime = exptime
self.screen_size = screen_size
self.screen_scale = screen_scale
self.logger = logger

if save_file and os.path.isfile(save_file):
self.logger.warning(f'Reading atmospheric PSF from {save_file}')
self.load_psf(save_file)
else:
self.logger.warning('Buidling atmospheric PSF')
self._build_atm(kcrit, doOpt, nproc)
if save_file:
self.logger.warning(f'Saving atmospheric PSF to {save_file}')
self.save_psf(save_file)

def save_psf(self, save_file):
"""
Save the psf as a pickle file.
"""
with open(save_file, 'wb') as fd:
with galsim.utilities.pickle_shared():
pickle.dump((self.atm, self.aper), fd)

def load_psf(self, save_file):
"""
Load a psf from a pickle file.
"""
with open(save_file, 'rb') as fd:
self.atm, self.aper = pickle.load(fd)

def _build_atm(self, kcrit, doOpt, nproc):

ctx = multiprocessing.get_context('fork')
self.atm = galsim.Atmosphere(mp_context=ctx, **self._getAtmKwargs())
Expand All @@ -142,31 +163,26 @@ def __init__(self, airmass, rawSeeing, band, rng,
r0_500 = self.atm.r0_500_effective
r0 = r0_500 * (self.wlen_eff/500.0)**(6./5)
kmax = kcrit / r0
if logger:
logger.warning("Building atmosphere")
self.logger.info("Instantiating atmospheric screens")

if nproc is None:
nproc = len(self.atm)

if nproc == 1:
self.atm.instantiate(kmax=kmax, check='phot')
else:
if logger:
logger.warning(f"Using {nproc} processes to build the phase screens")
self.logger.warning(f"Using {nproc} processes to build the phase screens")
with galsim.utilities.single_threaded():
with ctx.Pool(nproc, initializer=galsim.phase_screens.initWorker,
initargs=galsim.phase_screens.initWorkerArgs()) as pool:
self.atm.instantiate(pool=pool, kmax=kmax, check='phot')

if logger:
logger.info("Finished building atmosphere")
self.logger.info("Finished building atmosphere")
self.logger.debug("GSScreenShare keys = %s",list(galsim.phase_screens._GSScreenShare.keys()))
self.logger.debug("id(self) = %s",id(self))

if doOpt:
self.atm.append(OptWF(rng, self.wlen_eff))

if logger and gaussianFWHM > 0:
logger.debug("Convolving in Gaussian with FWHM = {}".format(gaussianFWHM))
self.gaussianFWHM = gaussianFWHM
self.atm.append(OptWF(self.rng, self.wlen_eff))

def __eq__(self, rhs):
return (isinstance(rhs, AtmosphericPSF)
Expand All @@ -176,8 +192,7 @@ def __eq__(self, rhs):
and self.t0 == rhs.t0
and self.exptime == rhs.exptime
and self.atm == rhs.atm
and self.aper == rhs.aper
and self.gaussianFWHM == rhs.gaussianFWHM)
and self.aper == rhs.aper)

@staticmethod
def _vkSeeing(r0_500, wavelength, L0):
Expand Down Expand Up @@ -258,102 +273,97 @@ def _getAtmKwargs(self):
altitude=altitudes, r0_weights=weights, rng=self.rng,
screen_size=self.screen_size, screen_scale=self.screen_scale)

def _getPSF(self, xPupil=None, yPupil=None, gsparams=None):
def getPSF(self, field_pos, gsparams=None):
"""
Return a PSF to be convolved with sources.

@param [in] xPupil the x coordinate on the pupil in arc seconds

@param [in] yPupil the y coordinate on the pupil in arc seconds
@param [in] field position of the object relative to the boresight direction.
"""
theta = (xPupil*galsim.arcsec, yPupil*galsim.arcsec)

theta = (field_pos.x*galsim.arcsec, field_pos.y*galsim.arcsec)
psf = self.atm.makePSF(
self.wlen_eff,
aper=self.aper,
theta=theta,
t0=self.t0,
exptime=self.exptime,
gsparams=gsparams)
if self.gaussianFWHM > 0.0:
psf = galsim.Convolve(
galsim.Gaussian(fwhm=self.gaussianFWHM, gsparams=gsparams),
psf
)
return psf

class AtmLoader(InputLoader):
"""Custom AtmosphericPSF loader that only loads the atmosphere once per exposure.

# Note: Except for the PsfBase base class, Everything above is identical to what is in the main
# imsim repo. Below is all the new stuff.

# Note: This could be refactored to avoid this bit of indirection. This class is a very
# thin wrapper around the above AtmosphericPSF class, just adjusting for a couple differences
# I wanted to make.
class AtmosphericPSFBuilder(object):
Note: For now, this just loads the atmosphere once for an entire imsim run.
If we ever decide we want to have a single config processing run handle multiple
exposures (rather than just multiple CCDs for a single exposure), we'll need to
reconsider this implementation.
"""
@param airmass Airmass of observation
@param rawSeeing The wavelength=500nm, zenith FWHM of the seeing
@param band One of ['u','g','r','i','z','y']
@param boresight The CelestialCoord of the boresight of the observation.
@param t0 Exposure time start in seconds. default: 0.
@param exptime Exposure time in seconds. default: 30.
@param kcrit Critical Fourier mode at which to split first and second kicks
in units of (1/r0). default: 0.2
@param screen_size Size of the phase screens in meters. default: 819.2
@param screen_scale Size of phase screen "pixels" in meters. default: 0.1
@param doOpt Add in optical phase screens? default: True
@param nproc Number of processes to use in creating screens. If None (default),
then allocate one process per phase screen, of which there are 6,
nominally.
@param save_file Not Implemented (TODO).
@param logger Optional logger. default: None
"""
_req_params = { 'airmass' : float,
'rawSeeing' : float,
'band' : str,
'boresight' : galsim.CelestialCoord,
}
_opt_params = { 't0' : float,
'exptime' : float,
'kcrit' : float,
'screen_size' : float,
'screen_scale' : float,
'doOpt' : bool,
'nproc' : int,
'save_file' : str,
}
_takes_rng = True

def __init__(self, airmass, rawSeeing, band, boresight, rng,
t0=0, exptime=30, kcrit=0.2, screen_size=819.2, screen_scale=0.1, doOpt=True,
nproc=None, save_file=None, logger=None):
# Note: one change from the above is that we don't do the gaussian part, so set that to 0.
# Instead, the user can choose to convolve this by a Gaussian in the config file.
self.atm = AtmosphericPSF(airmass, rawSeeing, band, rng,
t0=t0, exptime=exptime, kcrit=kcrit, gaussianFWHM=0.,
screen_size=screen_size, screen_scale=screen_scale,
doOpt=doOpt, logger=logger, nproc=nproc)
# The other change is that we need the boresight to do image_pos -> field_pos
# I think it makes sense to take that as an input here rather than in the
# PSF object each time (since it's the same for the whole observation).
self.boresight = boresight

def getBoresight(self):
return self.boresight

def getPSF(self, field_pos, gsparams):
return self.atm._getPSF(field_pos.x, field_pos.y, gsparams)
def getKwargs(self, config, base, logger):
logger.debug("Get kwargs for AtmosphericPSF")

req_params = { 'airmass' : float,
'rawSeeing' : float,
'band' : str,
'boresight' : galsim.CelestialCoord,
}
opt_params = { 't0' : float,
'exptime' : float,
'kcrit' : float,
'screen_size' : float,
'screen_scale' : float,
'doOpt' : bool,
'nproc' : int,
'save_file' : str,
}
kwargs, _ = galsim.config.GetAllParams(config, base, req=req_params, opt=opt_params)
logger.debug("kwargs = %s",kwargs)

# We want this to be set up right at the beginning of the run, before the config
# stuff has even set up the RNG yet. So make an rng ourselves based on the
# random seed in image.random_seed.

seed = base['image'].get('random_seed', None)
if seed is None:
raise RuntimeError("AtmLoader requires a seed in config['image']['random_seed']")
if isinstance(seed, list):
seed = seed[0] # If random_seed is a list, just use the first one.
# Parse the value in case it is an eval or something.
seed = galsim.config.ParseValue({'seed': seed}, 'seed', base, int)[0]
# Somewhat gratuitously add an aribtary value to this to avoid any correlations with
# other uses of this random seed elsewhere in the config processing.
seed += 271828
rng = galsim.BaseDeviate(seed)
kwargs['rng'] = rng
logger.debug("seed for atm = %s",seed)

# Include the logger
kwargs['logger'] = logger

# safe=True means this will be used for the whole run.
#safe = True
# TODO: However, this isn't working yet, since the multiple processes aren't getting the
# GSScreenShare dict updated properly. For now, we rely on the base process making the
# atmosphere if necessary and saving it to save_file. Then other processes will
# load it in. This happens if safe=False.
safe=False

return kwargs, safe


def BuildAtmosphericPSF(config, base, ignore, gsparams, logger):
"""Build an AtmosphericPSF from the information in the config file.
"""
atm = galsim.config.GetInputObj('atm_psf', config, base, 'AtmosphericPSF')
image_pos = base['image_pos']
boresight = atm.getBoresight()
boresight = atm.boresight
field_pos = base['wcs'].posToWorld(image_pos, project_center=boresight)
if gsparams: gsparams = GSParams(**gsparams)
else: gsparams = None

#logger.debug("Making PSF for pos %s",image_pos)
#logger.debug("GSScreenShare keys = %s",list(galsim.phase_screens._GSScreenShare.keys()))
#logger.debug("type(atm) = %s",str(type(atm)))
#logger.debug("id(atm) = %s",id(atm))
psf = atm.getPSF(field_pos, gsparams)
return psf, False

Expand Down Expand Up @@ -451,8 +461,7 @@ def BuildKolmogorovPSF(config, base, ignore, gsparams, logger):

return psf, safe

from galsim.config import InputLoader, RegisterInputType, RegisterObjectType
RegisterInputType('atm_psf', InputLoader(AtmosphericPSFBuilder, takes_logger=True))
RegisterInputType('atm_psf', AtmLoader(AtmosphericPSF, takes_logger=True))
RegisterObjectType('AtmosphericPSF', BuildAtmosphericPSF, input_type='atm_psf')
RegisterObjectType('DoubleGaussianPSF', BuildDoubleGaussianPSF)
RegisterObjectType('KolmogorovPSF', BuildKolmogorovPSF)
9 changes: 7 additions & 2 deletions imsim/ccd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import warnings
import galsim
from galsim.config import OutputBuilder, RegisterOutputType
from .cosmic_rays import CosmicRays
Expand Down Expand Up @@ -55,7 +56,8 @@ def setup(self, config, base, file_num, logger):
ccd_orientation = camera[det_name].getOrientation()
if hasattr(ccd_orientation, 'getHeight'):
z_offset = ccd_orientation.getHeight()*1.0e-3 # Convert to meters.
logger.info("Setting CCD z-offset to %.2e m", z_offset)
if z_offset != 0:
logger.info("Setting CCD z-offset to %.2e m", z_offset)
else:
z_offset = 0
if "shift_optics" not in base:
Expand Down Expand Up @@ -149,7 +151,10 @@ def buildImages(self, config, base, file_num, image_num, obj_num, ignore, logger
dectel = opsim_md.get('fieldDec', 0.)
image.header['RATEL'] = ratel
image.header['DECTEL'] = dectel
image.header['ROTTELPOS'] = opsim_md.get('rotTelPos', 0.)
with warnings.catch_warnings():
# Silence FITS warning about long header keyword
warnings.simplefilter('ignore')
image.header['ROTTELPOS'] = opsim_md.get('rotTelPos', 0.)
image.header['FILTER'] = opsim_md.get('band')
image.header['CAMERA'] = base['output']['camera']
image.header['HASTART'] = opsim_md.getHourAngle(mjd_obs, ratel)
Expand Down
Loading