Skip to content

Commit bb769e5

Browse files
Merge pull request #180 from valeriupredoi/missing-dtype
Get missing values and dtype without re-opening the dataset
2 parents b31ca82 + 53f4ed4 commit bb769e5

7 files changed

+191
-96
lines changed

activestorage/active.py

+78-74
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def __new__(cls, *args, **kwargs):
6868
}
6969
return instance
7070

71-
def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None, max_threads=100):
71+
def __init__(self, uri, ncvar, storage_type=None, max_threads=100):
7272
"""
7373
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
7474
(We need the variable, because we need variable specific metadata from within that
@@ -92,65 +92,6 @@ def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue
9292
self._method = None
9393
self._lock = False
9494
self._max_threads = max_threads
95-
96-
# obtain metadata, using netcdf4_python for now
97-
# FIXME: There is an outstanding issue with ._FilLValue to be handled.
98-
# If the user actually wrote the data with no fill value, or the
99-
# default fill value is in play, then this might go wrong.
100-
if storage_type is None:
101-
ds = Dataset(uri)
102-
elif storage_type == "s3":
103-
with load_from_s3(uri) as _ds:
104-
ds = _ds
105-
try:
106-
ds_var = ds[ncvar]
107-
except IndexError as exc:
108-
print(f"Dataset {ds} does not contain ncvar {ncvar!r}.")
109-
raise exc
110-
111-
# FIXME: We do not get the correct byte order on the Zarr Array's dtype
112-
# when using S3, so capture it here.
113-
self._dtype = ds_var.dtype
114-
115-
if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None):
116-
if isinstance(ds, Dataset):
117-
self._missing = getattr(ds_var, 'missing_value', None)
118-
self._fillvalue = getattr(ds_var, '_FillValue', None)
119-
# could be fill_value set as netCDF4 attr
120-
if self._fillvalue is None:
121-
self._fillvalue = getattr(ds_var, 'fill_value', None)
122-
valid_min = getattr(ds_var, 'valid_min', None)
123-
valid_max = getattr(ds_var, 'valid_max', None)
124-
valid_range = getattr(ds_var, 'valid_range', None)
125-
elif storage_type == "s3":
126-
self._missing = ds_var.attrs.get('missing_value')
127-
self._fillvalue = ds_var.attrs.get('_FillValue')
128-
# could be fill_value set as netCDF4 attr
129-
if self._fillvalue is None:
130-
self._fillvalue = ds_var.attrs.get('fill_value')
131-
valid_min = ds_var.attrs.get('valid_min')
132-
valid_max = ds_var.attrs.get('valid_max')
133-
valid_range = ds_var.attrs.get('valid_range')
134-
135-
if valid_max is not None or valid_min is not None:
136-
if valid_range is not None:
137-
raise ValueError(
138-
"Invalid combination in the file of valid_min, "
139-
"valid_max, valid_range: "
140-
f"{valid_min}, {valid_max}, {valid_range}"
141-
)
142-
valid_range = (valid_min, valid_max)
143-
elif valid_range is None:
144-
valid_range = (None, None)
145-
self._valid_min, self._valid_max = valid_range
146-
147-
else:
148-
self._missing = missing_value
149-
self._fillvalue = _FillValue
150-
self._valid_min = valid_min
151-
self._valid_max = valid_max
152-
153-
ds.close()
15495

15596
def __getitem__(self, index):
15697
"""
@@ -174,22 +115,16 @@ def __getitem__(self, index):
174115
elif self.storage_type == "s3":
175116
with load_from_s3(self.uri) as nc:
176117
data = nc[ncvar][index]
177-
# h5netcdf doesn't return masked arrays.
178-
if self._fillvalue:
179-
data = np.ma.masked_equal(data, self._fillvalue)
180-
if self._missing:
181-
data = np.ma.masked_equal(data, self._missing)
182-
if self._valid_max:
183-
data = np.ma.masked_greater(data, self._valid_max)
184-
if self._valid_min:
185-
data = np.ma.masked_less(data, self._valid_min)
118+
data = self._mask_data(data, nc[ncvar])
186119

187120
if lock:
188121
lock.release()
189-
122+
190123
return data
124+
191125
elif self._version == 1:
192126
return self._via_kerchunk(index)
127+
193128
elif self._version == 2:
194129
# No active operation either
195130
lock = self.lock
@@ -202,6 +137,7 @@ def __getitem__(self, index):
202137
lock.release()
203138

204139
return data
140+
205141
else:
206142
raise ValueError(f'Version {self._version} not supported')
207143

@@ -299,14 +235,27 @@ def _via_kerchunk(self, index):
299235
if self.zds is None:
300236
print(f"Kerchunking file {self.uri} with variable "
301237
f"{self.ncvar} for storage type {self.storage_type}")
302-
ds = nz.load_netcdf_zarr_generic(self.uri,
303-
self.ncvar,
304-
self.storage_type)
238+
ds, zarray, zattrs = nz.load_netcdf_zarr_generic(
239+
self.uri,
240+
self.ncvar,
241+
self.storage_type
242+
)
305243
# The following is a hangove from exploration
306244
# and is needed if using the original doing it ourselves
307245
# self.zds = make_an_array_instance_active(ds)
308246
self.zds = ds
309247

248+
# Retain attributes and other information
249+
if zarray.get('fill_value') is not None:
250+
zattrs['_FillValue'] = zarray['fill_value']
251+
252+
self.zarray = zarray
253+
self.zattrs = zattrs
254+
255+
# FIXME: We do not get the correct byte order on the Zarr
256+
# Array's dtype when using S3, so capture it here.
257+
self._dtype = np.dtype(zarray['dtype'])
258+
310259
return self._get_selection(index)
311260

312261
def _get_selection(self, *args):
@@ -319,7 +268,28 @@ def _get_selection(self, *args):
319268
compressor = self.zds._compressor
320269
filters = self.zds._filters
321270

322-
missing = self._fillvalue, self._missing, self._valid_min, self._valid_max
271+
# Get missing values
272+
_FillValue = self.zattrs.get('_FillValue')
273+
missing_value = self.zattrs.get('missing_value')
274+
valid_min = self.zattrs.get('valid_min')
275+
valid_max = self.zattrs.get('valid_max')
276+
valid_range = self.zattrs.get('valid_range')
277+
if valid_max is not None or valid_min is not None:
278+
if valid_range is not None:
279+
raise ValueError(
280+
"Invalid combination in the file of valid_min, "
281+
"valid_max, valid_range: "
282+
f"{valid_min}, {valid_max}, {valid_range}"
283+
)
284+
elif valid_range is not None:
285+
valid_min, valid_max = valid_range
286+
287+
missing = (
288+
_FillValue,
289+
missing_value,
290+
valid_min,
291+
valid_max,
292+
)
323293

324294
indexer = OrthogonalIndexer(*args, self.zds)
325295
out_shape = indexer.shape
@@ -468,3 +438,37 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts,
468438
if drop_axes:
469439
tmp = np.squeeze(tmp, axis=drop_axes)
470440
return tmp, out_selection
441+
442+
def _mask_data(self, data, ds_var):
443+
"""ppp"""
444+
# TODO: replace with cfdm.NetCDFIndexer, hopefully.
445+
attrs = ds_var.attrs
446+
missing_value = attrs.get('missing_value')
447+
_FillValue = attrs.get('_FillValue')
448+
valid_min = attrs.get('valid_min')
449+
valid_max = attrs.get('valid_max')
450+
valid_range = attrs.get('valid_range')
451+
452+
if valid_max is not None or valid_min is not None:
453+
if valid_range is not None:
454+
raise ValueError(
455+
"Invalid combination in the file of valid_min, "
456+
"valid_max, valid_range: "
457+
f"{valid_min}, {valid_max}, {valid_range}"
458+
)
459+
elif valid_range is not None:
460+
valid_min, valid_max = valid_range
461+
462+
if _FillValue is not None:
463+
data = np.ma.masked_equal(data, _FillValue)
464+
465+
if missing_value is not None:
466+
data = np.ma.masked_equal(data, missing_value)
467+
468+
if valid_max is not None:
469+
data = np.ma.masked_greater(data, valid_max)
470+
471+
if valid_min is not None:
472+
data = np.ma.masked_less(data, valid_min)
473+
474+
return data

activestorage/netcdf_to_zarr.py

+27-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from kerchunk.hdf import SingleHdf5ToZarr
1111

1212

13-
def gen_json(file_url, outf, storage_type):
13+
def gen_json(file_url, varname, outf, storage_type):
1414
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
1515
if storage_type == "s3":
1616
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY,
@@ -24,7 +24,8 @@ def gen_json(file_url, outf, storage_type):
2424
h5chunks = SingleHdf5ToZarr(s3file, file_url,
2525
inline_threshold=0)
2626
with fs2.open(outf, 'wb') as f:
27-
f.write(ujson.dumps(h5chunks.translate()).encode())
27+
content = h5chunks.translate()
28+
f.write(ujson.dumps(content).encode())
2829
else:
2930
fs = fsspec.filesystem('')
3031
with fs.open(file_url, 'rb') as local_file:
@@ -43,9 +44,13 @@ def gen_json(file_url, outf, storage_type):
4344
# faster loading time
4445
# for active storage, we don't want anything inline
4546
with fs.open(outf, 'wb') as f:
46-
f.write(ujson.dumps(h5chunks.translate()).encode())
47+
content = h5chunks.translate()
48+
f.write(ujson.dumps(content).encode())
4749

48-
return outf
50+
zarray = ujson.loads(content['refs'][f"{varname}/.zarray"])
51+
zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"])
52+
53+
return outf, zarray, zattrs
4954

5055

5156
def open_zarr_group(out_json, varname):
@@ -60,14 +65,15 @@ def open_zarr_group(out_json, varname):
6065
mapper = fs.get_mapper("") # local FS mapper
6166
#mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array?
6267
zarr_group = zarr.open_group(mapper)
68+
6369
try:
6470
zarr_array = getattr(zarr_group, varname)
6571
except AttributeError as attrerr:
6672
print(f"Zarr Group does not contain variable {varname}. "
6773
f"Zarr Group info: {zarr_group.info}")
6874
raise attrerr
6975
#print("Zarr array info:", zarr_array.info)
70-
76+
7177
return zarr_array
7278

7379

@@ -77,10 +83,24 @@ def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True):
7783

7884
# Write the Zarr group JSON to a temporary file.
7985
with tempfile.NamedTemporaryFile() as out_json:
80-
gen_json(fileloc, out_json.name, storage_type)
86+
_, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type)
8187

8288
# open this monster
8389
print(f"Attempting to open and convert {fileloc}.")
8490
ref_ds = open_zarr_group(out_json.name, varname)
8591

86-
return ref_ds
92+
return ref_ds, zarray, zattrs
93+
94+
95+
#d = {'version': 1,
96+
# 'refs': {
97+
# '.zgroup': '{"zarr_format":2}',
98+
# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"[email protected]","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}',
99+
# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[324],"zarr_format":2}', 'lat/.zattrs': '{"_ARRAY_DIMENSIONS":["lat"],"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north"}',
100+
# 'lat/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 26477, 560],
101+
# 'lon/.zarray': '{"chunks":[432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[432],"zarr_format":2}',
102+
# 'lon/.zattrs': '{"_ARRAY_DIMENSIONS":["lon"],"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east"}',
103+
# 'lon/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 27037, 556],
104+
# 'm01s00i507_10/.zarray': '{"chunks":[1,324,432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":-1073741824.0,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[80,324,432],"zarr_format":2}',
105+
# 'm01s00i507_10/.zattrs': '{"_ARRAY_DIMENSIONS":["time_counter","lat","lon"],"cell_methods":"time: mean (interval: 900 s)","coordinates":"time_centered","interval_offset":"0ts","interval_operation":"900 s","interval_write":"3 h","long_name":"OPEN SEA SURFACE TEMP AFTER TIMESTEP","missing_value":-1073741824.0,"online_operation":"average","standard_name":"surface_temperature","units":"K"}',
106+
# }}

tests/test_bigger_data.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def test_native_emac_model_fails(test_data_path):
149149
"""
150150
An example of netCDF file that doesn't work
151151
152-
The actual issue is with h5py - it can't read it (netCDF classic)
152+
The actual issue is with h5py - it can't read it (netCDF3 classic)
153153
154154
h5py/_objects.pyx:54: in h5py._objects.with_phil.wrapper
155155
???
@@ -175,8 +175,9 @@ def test_native_emac_model_fails(test_data_path):
175175
pass
176176

177177
if USE_S3:
178+
active = Active(uri, "aps_ave", utils.get_storage_type())
178179
with pytest.raises(OSError):
179-
active = Active(uri, "aps_ave", utils.get_storage_type())
180+
active[...]
180181
else:
181182
active = Active(uri, "aps_ave")
182183
active._version = 2
@@ -243,13 +244,13 @@ def test_daily_data_masked(test_data_path):
243244
"""
244245
ncfile = str(test_data_path / "daily_data_masked.nc")
245246
uri = utils.write_to_storage(ncfile)
246-
active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
247+
active = Active(uri, "ta", utils.get_storage_type())
247248
active._version = 0
248249
d = active[:]
249250
d = np.ma.masked_where(d==999., d)
250251
mean_result = np.ma.mean(d)
251252

252-
active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
253+
active = Active(uri, "ta", utils.get_storage_type())
253254
active._version = 2
254255
active.method = "mean"
255256
active.components = True

tests/test_data/daily_data_masked.nc

105 Bytes
Binary file not shown.

tests/test_missing.py

+62
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import tempfile
88
import unittest
99

10+
import h5netcdf
11+
1012
from netCDF4 import Dataset
1113

1214
from activestorage.active import Active
@@ -240,3 +242,63 @@ def test_validrange(tmp_path):
240242

241243
np.testing.assert_array_equal(masked_numpy_mean, active_mean)
242244
np.testing.assert_array_equal(no_active_mean, active_mean)
245+
246+
247+
def test_active_mask_data(tmp_path):
248+
testfile = str(tmp_path / 'test_partially_missing_data.nc')
249+
250+
# with valid min
251+
r = dd.make_validmin_ncdata(testfile, valid_min=500)
252+
253+
# retrieve the actual numpy-ed result
254+
actual_data = load_dataset(testfile)
255+
256+
# dataset variable
257+
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
258+
dsvar = ds["data"]
259+
260+
# test the function
261+
data = Active._mask_data(None, actual_data, dsvar)
262+
ds.close()
263+
264+
# with valid range
265+
r = dd.make_validrange_ncdata(testfile, valid_range=[750., 850.])
266+
267+
# retrieve the actual numpy-ed result
268+
actual_data = load_dataset(testfile)
269+
270+
# dataset variable
271+
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
272+
dsvar = ds["data"]
273+
274+
# test the function
275+
data = Active._mask_data(None, actual_data, dsvar)
276+
ds.close()
277+
278+
# with missing
279+
r = dd.make_missing_ncdata(testfile)
280+
281+
# retrieve the actual numpy-ed result
282+
actual_data = load_dataset(testfile)
283+
284+
# dataset variable
285+
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
286+
dsvar = ds["data"]
287+
288+
# test the function
289+
data = Active._mask_data(None, actual_data, dsvar)
290+
ds.close()
291+
292+
# with _FillValue
293+
r = dd.make_fillvalue_ncdata(testfile)
294+
295+
# retrieve the actual numpy-ed result
296+
actual_data = load_dataset(testfile)
297+
298+
# dataset variable
299+
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
300+
dsvar = ds["data"]
301+
302+
# test the function
303+
data = Active._mask_data(None, actual_data, dsvar)
304+
ds.close()

0 commit comments

Comments
 (0)