Skip to content

Commit 5177b06

Browse files
Merge pull request #241 from NCAS-CMS/new_api_pyfive
Improving API: part 1: functionality for input pyfive.high_level.Dataset
2 parents bd47be8 + 050bc8f commit 5177b06

File tree

6 files changed

+174
-86
lines changed

6 files changed

+174
-86
lines changed

.github/workflows/test_s3_minio.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ on:
66
push:
77
branches:
88
- main # keep this at all times
9-
- pyfive
9+
# - pyfive # reinstate
10+
- new_api_pyfive
1011
pull_request:
1112
schedule:
1213
- cron: '0 0 * * *' # nightly

activestorage/active.py

+50-66
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import pathlib
55
import urllib
66
import pyfive
7+
import s3fs
78
import time
8-
from pyfive.h5d import StoreInfo
99

10-
import s3fs
10+
from pathlib import Path
11+
from pyfive.h5d import StoreInfo
12+
from typing import Optional
1113

1214
from activestorage.config import *
1315
from activestorage import reductionist
@@ -47,21 +49,6 @@ def load_from_s3(uri, storage_options=None):
4749
print(f"Dataset loaded from S3 with s3fs and Pyfive: {uri} ({t2-t1:.2},{t3-t2:.2})")
4850
return ds
4951

50-
def _metricise(method):
51-
""" Decorator for class methods loads into metric_data"""
52-
def timed(self, *args, **kw):
53-
ts = time.time()
54-
metric_name=''
55-
if '__metric_name' in kw:
56-
metric_name = kw['__metric_name']
57-
del kw['__metric_name']
58-
result = method(self,*args, **kw)
59-
te = time.time()
60-
if metric_name:
61-
self.metric_data[metric_name] = te-ts
62-
return result
63-
return timed
64-
6552

6653
def get_missing_attributes(ds):
6754
""""
@@ -122,13 +109,13 @@ def __new__(cls, *args, **kwargs):
122109

123110
def __init__(
124111
self,
125-
uri,
126-
ncvar,
127-
storage_type=None,
128-
max_threads=100,
129-
storage_options=None,
130-
active_storage_url=None
131-
):
112+
dataset: Optional[str | Path | object] ,
113+
ncvar: str = None,
114+
storage_type: str = None,
115+
max_threads: int = 100,
116+
storage_options: dict = None,
117+
active_storage_url: str = None
118+
) -> None:
132119
"""
133120
Instantiate with a NetCDF4 dataset URI and the variable of interest within that file.
134121
(We need the variable, because we need variable specific metadata from within that
@@ -138,50 +125,69 @@ def __init__(
138125
:param storage_options: s3fs.S3FileSystem options
139126
:param active_storage_url: Reductionist server URL
140127
"""
141-
# Assume NetCDF4 for now
142-
self.uri = uri
143-
if self.uri is None:
144-
raise ValueError(f"Must use a valid file for uri. Got {uri}")
128+
self.ds = None
129+
input_variable = False
130+
if dataset is None:
131+
raise ValueError(f"Must use a valid file name or variable object for dataset. Got {dataset!r}")
132+
if isinstance(dataset, Path) and not dataset.exists():
133+
raise ValueError(f"Path to input file {dataset!r} does not exist.")
134+
if not isinstance(dataset, Path) and not isinstance(dataset, str):
135+
print(f"Treating input {dataset} as variable object.")
136+
if not type(dataset) is pyfive.high_level.Dataset:
137+
raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset!r}")
138+
input_variable = True
139+
self.ds = dataset
140+
self.uri = dataset
141+
145142

146143
# still allow for a passable storage_type
147144
# for special cases eg "special-POSIX" ie DDN
148145
if not storage_type and storage_options is not None:
149-
storage_type = urllib.parse.urlparse(uri).scheme
146+
storage_type = urllib.parse.urlparse(dataset).scheme
150147
self.storage_type = storage_type
151148

149+
# set correct filename attr
150+
if input_variable and not self.storage_type:
151+
self.filename = self.ds
152+
elif input_variable and self.storage_type == "s3":
153+
self.filename = self.ds.id._filename
154+
152155
# get storage_options
153156
self.storage_options = storage_options
154157
self.active_storage_url = active_storage_url
155158

156159
# basic check on file
157-
if not os.path.isfile(self.uri) and not self.storage_type:
158-
raise ValueError(f"Must use existing file for uri. {self.uri} not found")
160+
if not input_variable:
161+
if not os.path.isfile(self.uri) and not self.storage_type:
162+
raise ValueError(f"Must use existing file for uri. {self.uri} not found")
159163

160164
self.ncvar = ncvar
161-
if self.ncvar is None:
165+
if self.ncvar is None and not input_variable:
162166
raise ValueError("Must set a netCDF variable name to slice")
163167

164168
self._version = 1
165169
self._components = False
166170
self._method = None
167171
self._max_threads = max_threads
168172
self.missing = None
169-
self.ds = None
170-
self.metric_data = {}
171173
self.data_read = 0
172174

173-
@_metricise
174175
def __load_nc_file(self):
175-
""" Get the netcdf file and it's b-tree"""
176+
"""
177+
Get the netcdf file and its b-tree.
178+
179+
This private method is used only if the input to Active
180+
is not a pyfive.high_level.Dataset object. In that case,
181+
any file opening is skipped, and ncvar is not used. The
182+
Dataset object will have already contained the b-tree,
183+
and `_filename` attribute.
184+
"""
176185
ncvar = self.ncvar
177-
# in all cases we need an open netcdf file to get at attributes
178-
# we keep it open because we need it's b-tree
179186
if self.storage_type is None:
180187
nc = pyfive.File(self.uri)
181188
elif self.storage_type == "s3":
182189
nc = load_from_s3(self.uri, self.storage_options)
183190
self.filename = self.uri
184-
185191
self.ds = nc[ncvar]
186192

187193
def __get_missing_attributes(self):
@@ -194,10 +200,8 @@ def __getitem__(self, index):
194200
Provides support for a standard get item.
195201
#FIXME-BNL: Why is the argument index?
196202
"""
197-
self.metric_data = {}
198203
if self.ds is None:
199-
self.__load_nc_file(__metric_name='load nc time')
200-
#self.__metricise('Load','__load_nc_file')
204+
self.__load_nc_file()
201205

202206
self.missing = self.__get_missing_attributes()
203207

@@ -206,21 +210,20 @@ def __getitem__(self, index):
206210
if self.method is None and self._version == 0:
207211

208212
# No active operation
209-
return self._get_vanilla(index, __metric_name='vanilla_time')
213+
return self._get_vanilla(index)
210214

211215
elif self._version == 1:
212216

213217
#FIXME: is the difference between version 1 and 2 still honoured?
214-
return self._get_selection(index, __metric_name='selection 1 time (s)')
218+
return self._get_selection(index)
215219

216220
elif self._version == 2:
217221

218-
return self._get_selection(index, __metric_name='selection 2 time (s)')
222+
return self._get_selection(index)
219223

220224
else:
221225
raise ValueError(f'Version {self._version} not supported')
222226

223-
@_metricise
224227
def _get_vanilla(self, index):
225228
"""
226229
Get the data without any active operation
@@ -294,7 +297,7 @@ def _get_active(self, method, *args):
294297
"""
295298
raise NotImplementedError
296299

297-
@_metricise
300+
298301
def _get_selection(self, *args):
299302
"""
300303
At this point we have a Dataset object, but all the important information about
@@ -307,13 +310,8 @@ def _get_selection(self, *args):
307310

308311
name = self.ds.name
309312
dtype = np.dtype(self.ds.dtype)
310-
# hopefully fix pyfive to get a dtype directly
311313
array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks)
312314
ds = self.ds.id
313-
314-
self.metric_data['args'] = args
315-
self.metric_data['dataset shape'] = self.ds.shape
316-
self.metric_data['dataset chunks'] = self.ds.chunks
317315
if ds.filter_pipeline is None:
318316
compressor, filters = None, None
319317
else:
@@ -359,16 +357,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
359357
session = None
360358

361359
# Process storage chunks using a thread pool.
362-
# Because we do this, we need to read the dataset b-tree now, not as we go, so
363-
# it is already in cache. If we remove the thread pool from here, we probably
364-
# wouldn't need to do it before the first one.
365-
366-
if ds.chunks is not None:
367-
t1 = time.time()
368-
# ds._get_chunk_addresses()
369-
t2 = time.time() - t1
370-
self.metric_data['indexing time (s)'] = t2
371-
# self.metric_data['chunk number'] = len(ds._zchunk_index)
372360
chunk_count = 0
373361
t1 = time.time()
374362
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor:
@@ -433,10 +421,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
433421
# size.
434422
out = out / np.sum(counts).reshape(shape1)
435423

436-
t2 = time.time()
437-
self.metric_data['reduction time (s)'] = t2-t1
438-
self.metric_data['chunks processed'] = chunk_count
439-
self.metric_data['storage read (B)'] = self.data_read
440424
return out
441425

442426
def _get_endpoint_url(self):

activestorage/storage.py

+35-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
"""Active storage module."""
22
import numpy as np
3+
import pyfive
34

45
from numcodecs.compat import ensure_ndarray
56

7+
68
def reduce_chunk(rfile,
79
offset, size, compression, filters, missing, dtype, shape,
810
order, chunk_selection, method=None):
@@ -25,20 +27,39 @@ def reduce_chunk(rfile,
2527
storage implementations we'll change to controlled vocabulary)
2628
2729
"""
28-
29-
#FIXME: for the moment, open the file every time ... we might want to do that, or not
30-
with open(rfile,'rb') as open_file:
31-
# get the data
32-
chunk = read_block(open_file, offset, size)
33-
# reverse any compression and filters
34-
chunk = filter_pipeline(chunk, compression, filters)
35-
# make it a numpy array of bytes
36-
chunk = ensure_ndarray(chunk)
37-
# convert to the appropriate data type
38-
chunk = chunk.view(dtype)
39-
# sort out ordering and convert to the parent hyperslab dimensions
40-
chunk = chunk.reshape(-1, order='A')
41-
chunk = chunk.reshape(shape, order=order)
30+
obj_type = type(rfile)
31+
print(f"Reducing chunk of object {obj_type}")
32+
33+
if not obj_type is pyfive.high_level.Dataset:
34+
#FIXME: for the moment, open the file every time ... we might want to do that, or not
35+
# we could just use an instance of pyfive.high_level.Dataset.id
36+
# passed directly from active.py, as below
37+
with open(rfile,'rb') as open_file:
38+
# get the data
39+
chunk = read_block(open_file, offset, size)
40+
# reverse any compression and filters
41+
chunk = filter_pipeline(chunk, compression, filters)
42+
# make it a numpy array of bytes
43+
chunk = ensure_ndarray(chunk)
44+
# convert to the appropriate data type
45+
chunk = chunk.view(dtype)
46+
# sort out ordering and convert to the parent hyperslab dimensions
47+
chunk = chunk.reshape(-1, order='A')
48+
chunk = chunk.reshape(shape, order=order)
49+
else:
50+
class storeinfo: pass
51+
storeinfo.byte_offset = offset
52+
storeinfo.size = size
53+
chunk = rfile.id._get_raw_chunk(storeinfo)
54+
# reverse any compression and filters
55+
chunk = filter_pipeline(chunk, compression, filters)
56+
# make it a numpy array of bytes
57+
chunk = ensure_ndarray(chunk)
58+
# convert to the appropriate data type
59+
chunk = chunk.view(dtype)
60+
# sort out ordering and convert to the parent hyperslab dimensions
61+
chunk = chunk.reshape(-1, order='A')
62+
chunk = chunk.reshape(shape, order=order)
4263

4364
tmp = chunk[chunk_selection]
4465
if method:

tests/test_real_s3.py

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import os
2+
import numpy as np
3+
4+
from activestorage.active import Active
5+
from activestorage.active import load_from_s3
6+
7+
S3_BUCKET = "bnl"
8+
9+
# TODO Remove after full testing and right before deployment
10+
def test_s3_dataset():
11+
"""Run somewhat as the 'gold' test."""
12+
storage_options = {
13+
'key': "f2d55c6dcfc7618b2c34e00b58df3cef",
14+
'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT",
15+
'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # old proxy
16+
# 'client_kwargs': {'endpoint_url': "https://uor-aces-o.ext.proxy.jc.rl.ac.uk"}, # new proxy
17+
}
18+
active_storage_url = "https://192.171.169.113:8080"
19+
# bigger_file = "ch330a.pc19790301-bnl.nc" # 18GB 3400 HDF5 chunks
20+
bigger_file = "ch330a.pc19790301-def.nc" # 17GB 64 HDF5 chunks
21+
# bigger_file = "da193a_25_day__198808-198808.nc" # 3GB 30 HDF5 chunks
22+
23+
test_file_uri = os.path.join(
24+
S3_BUCKET,
25+
bigger_file
26+
)
27+
print("S3 Test file path:", test_file_uri)
28+
dataset = load_from_s3(test_file_uri, storage_options=storage_options)
29+
av = dataset['UM_m01s16i202_vn1106']
30+
31+
# big file bnl: 18GB/3400 HDF5 chunks; def: 17GB/64 HDF5 chunks
32+
active = Active(av, storage_type="s3",
33+
storage_options=storage_options,
34+
active_storage_url=active_storage_url)
35+
active._version = 2
36+
active._method = "min"
37+
38+
# result = active[:]
39+
result = active[0:3, 4:6, 7:9] # standardized slice
40+
41+
print("Result is", result)
42+
assert result == 5098.625

0 commit comments

Comments
 (0)