diff --git a/activestorage/active.py b/activestorage/active.py index fed673ca..841c3a60 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -68,7 +68,7 @@ def __new__(cls, *args, **kwargs): } return instance - def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None, max_threads=100): + def __init__(self, uri, ncvar, storage_type=None, max_threads=100): """ Instantiate with a NetCDF4 dataset and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that @@ -92,65 +92,6 @@ def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue self._method = None self._lock = False self._max_threads = max_threads - - # obtain metadata, using netcdf4_python for now - # FIXME: There is an outstanding issue with ._FilLValue to be handled. - # If the user actually wrote the data with no fill value, or the - # default fill value is in play, then this might go wrong. - if storage_type is None: - ds = Dataset(uri) - elif storage_type == "s3": - with load_from_s3(uri) as _ds: - ds = _ds - try: - ds_var = ds[ncvar] - except IndexError as exc: - print(f"Dataset {ds} does not contain ncvar {ncvar!r}.") - raise exc - - # FIXME: We do not get the correct byte order on the Zarr Array's dtype - # when using S3, so capture it here. - self._dtype = ds_var.dtype - - if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None): - if isinstance(ds, Dataset): - self._missing = getattr(ds_var, 'missing_value', None) - self._fillvalue = getattr(ds_var, '_FillValue', None) - # could be fill_value set as netCDF4 attr - if self._fillvalue is None: - self._fillvalue = getattr(ds_var, 'fill_value', None) - valid_min = getattr(ds_var, 'valid_min', None) - valid_max = getattr(ds_var, 'valid_max', None) - valid_range = getattr(ds_var, 'valid_range', None) - elif storage_type == "s3": - self._missing = ds_var.attrs.get('missing_value') - self._fillvalue = ds_var.attrs.get('_FillValue') - # could be fill_value set as netCDF4 attr - if self._fillvalue is None: - self._fillvalue = ds_var.attrs.get('fill_value') - valid_min = ds_var.attrs.get('valid_min') - valid_max = ds_var.attrs.get('valid_max') - valid_range = ds_var.attrs.get('valid_range') - - if valid_max is not None or valid_min is not None: - if valid_range is not None: - raise ValueError( - "Invalid combination in the file of valid_min, " - "valid_max, valid_range: " - f"{valid_min}, {valid_max}, {valid_range}" - ) - valid_range = (valid_min, valid_max) - elif valid_range is None: - valid_range = (None, None) - self._valid_min, self._valid_max = valid_range - - else: - self._missing = missing_value - self._fillvalue = _FillValue - self._valid_min = valid_min - self._valid_max = valid_max - - ds.close() def __getitem__(self, index): """ @@ -174,22 +115,16 @@ def __getitem__(self, index): elif self.storage_type == "s3": with load_from_s3(self.uri) as nc: data = nc[ncvar][index] - # h5netcdf doesn't return masked arrays. - if self._fillvalue: - data = np.ma.masked_equal(data, self._fillvalue) - if self._missing: - data = np.ma.masked_equal(data, self._missing) - if self._valid_max: - data = np.ma.masked_greater(data, self._valid_max) - if self._valid_min: - data = np.ma.masked_less(data, self._valid_min) + data = self._mask_data(data, nc[ncvar]) if lock: lock.release() - + return data + elif self._version == 1: return self._via_kerchunk(index) + elif self._version == 2: # No active operation either lock = self.lock @@ -202,6 +137,7 @@ def __getitem__(self, index): lock.release() return data + else: raise ValueError(f'Version {self._version} not supported') @@ -299,14 +235,27 @@ def _via_kerchunk(self, index): if self.zds is None: print(f"Kerchunking file {self.uri} with variable " f"{self.ncvar} for storage type {self.storage_type}") - ds = nz.load_netcdf_zarr_generic(self.uri, - self.ncvar, - self.storage_type) + ds, zarray, zattrs = nz.load_netcdf_zarr_generic( + self.uri, + self.ncvar, + self.storage_type + ) # The following is a hangove from exploration # and is needed if using the original doing it ourselves # self.zds = make_an_array_instance_active(ds) self.zds = ds + # Retain attributes and other information + if zarray.get('fill_value') is not None: + zattrs['_FillValue'] = zarray['fill_value'] + + self.zarray = zarray + self.zattrs = zattrs + + # FIXME: We do not get the correct byte order on the Zarr + # Array's dtype when using S3, so capture it here. + self._dtype = np.dtype(zarray['dtype']) + return self._get_selection(index) def _get_selection(self, *args): @@ -319,7 +268,28 @@ def _get_selection(self, *args): compressor = self.zds._compressor filters = self.zds._filters - missing = self._fillvalue, self._missing, self._valid_min, self._valid_max + # Get missing values + _FillValue = self.zattrs.get('_FillValue') + missing_value = self.zattrs.get('missing_value') + valid_min = self.zattrs.get('valid_min') + valid_max = self.zattrs.get('valid_max') + valid_range = self.zattrs.get('valid_range') + if valid_max is not None or valid_min is not None: + if valid_range is not None: + raise ValueError( + "Invalid combination in the file of valid_min, " + "valid_max, valid_range: " + f"{valid_min}, {valid_max}, {valid_range}" + ) + elif valid_range is not None: + valid_min, valid_max = valid_range + + missing = ( + _FillValue, + missing_value, + valid_min, + valid_max, + ) indexer = OrthogonalIndexer(*args, self.zds) out_shape = indexer.shape @@ -468,3 +438,37 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, if drop_axes: tmp = np.squeeze(tmp, axis=drop_axes) return tmp, out_selection + + def _mask_data(self, data, ds_var): + """ppp""" + # TODO: replace with cfdm.NetCDFIndexer, hopefully. + attrs = ds_var.attrs + missing_value = attrs.get('missing_value') + _FillValue = attrs.get('_FillValue') + valid_min = attrs.get('valid_min') + valid_max = attrs.get('valid_max') + valid_range = attrs.get('valid_range') + + if valid_max is not None or valid_min is not None: + if valid_range is not None: + raise ValueError( + "Invalid combination in the file of valid_min, " + "valid_max, valid_range: " + f"{valid_min}, {valid_max}, {valid_range}" + ) + elif valid_range is not None: + valid_min, valid_max = valid_range + + if _FillValue is not None: + data = np.ma.masked_equal(data, _FillValue) + + if missing_value is not None: + data = np.ma.masked_equal(data, missing_value) + + if valid_max is not None: + data = np.ma.masked_greater(data, valid_max) + + if valid_min is not None: + data = np.ma.masked_less(data, valid_min) + + return data diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index a8dcb034..782ff729 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -10,7 +10,7 @@ from kerchunk.hdf import SingleHdf5ToZarr -def gen_json(file_url, outf, storage_type): +def gen_json(file_url, varname, outf, storage_type): """Generate a json file that contains the kerchunk-ed data for Zarr.""" if storage_type == "s3": fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, @@ -24,7 +24,8 @@ def gen_json(file_url, outf, storage_type): h5chunks = SingleHdf5ToZarr(s3file, file_url, inline_threshold=0) with fs2.open(outf, 'wb') as f: - f.write(ujson.dumps(h5chunks.translate()).encode()) + content = h5chunks.translate() + f.write(ujson.dumps(content).encode()) else: fs = fsspec.filesystem('') with fs.open(file_url, 'rb') as local_file: @@ -43,9 +44,13 @@ def gen_json(file_url, outf, storage_type): # faster loading time # for active storage, we don't want anything inline with fs.open(outf, 'wb') as f: - f.write(ujson.dumps(h5chunks.translate()).encode()) + content = h5chunks.translate() + f.write(ujson.dumps(content).encode()) - return outf + zarray = ujson.loads(content['refs'][f"{varname}/.zarray"]) + zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"]) + + return outf, zarray, zattrs def open_zarr_group(out_json, varname): @@ -60,6 +65,7 @@ def open_zarr_group(out_json, varname): mapper = fs.get_mapper("") # local FS mapper #mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array? zarr_group = zarr.open_group(mapper) + try: zarr_array = getattr(zarr_group, varname) except AttributeError as attrerr: @@ -67,7 +73,7 @@ def open_zarr_group(out_json, varname): f"Zarr Group info: {zarr_group.info}") raise attrerr #print("Zarr array info:", zarr_array.info) - + return zarr_array @@ -77,10 +83,24 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True): # Write the Zarr group JSON to a temporary file. with tempfile.NamedTemporaryFile() as out_json: - gen_json(fileloc, out_json.name, storage_type) + _, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type) # open this monster print(f"Attempting to open and convert {fileloc}.") ref_ds = open_zarr_group(out_json.name, varname) - return ref_ds + return ref_ds, zarray, zattrs + + +#d = {'version': 1, +# 'refs': { +# '.zgroup': '{"zarr_format":2}', +# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}', +# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"