From 97d6360a16e370724db3654802cd98083945d3f0 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2025 16:48:36 +0000 Subject: [PATCH 01/30] start api changes --- activestorage/active.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 761cf3b6..ee5db132 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -167,10 +167,10 @@ def __init__( self._max_threads = max_threads self.missing = None self.ds = None + self.netCDF4Dataset = None self.metric_data = {} self.data_read = 0 - @_metricise def __load_nc_file(self): """ Get the netcdf file and it's b-tree""" ncvar = self.ncvar @@ -183,6 +183,11 @@ def __load_nc_file(self): self.filename = self.uri self.ds = nc[ncvar] + return self.ds + + def _netCDF4Dataset(self): + if not self.netCDF4Dataset: + return self.__load_nc_file() def __get_missing_attributes(self): if self.ds is None: @@ -566,3 +571,16 @@ def _mask_data(self, data): data = np.ma.masked_less(data, valid_min) return data + + +class ActiveVariable(): + """ + A netCDF4.Dataset-like variable built on top of the + Active class. It preserves all properties and methods + a regular netCDF4.Dataset has, but some of them are custom + built with Active Storage functionality in mind. + """ + def __init__(self, active): + self.ds = active._netCDF4Dataset() + + From d4f0588eda1a1d4961cfb37aa9e0f4a9a4b817eb Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Tue, 25 Feb 2025 16:48:52 +0000 Subject: [PATCH 02/30] start api changes --- tests/unit/test_active.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 25f988cc..413ec962 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -3,7 +3,7 @@ import pytest import threading -from activestorage.active import Active +from activestorage.active import Active, ActiveVariable from activestorage.active import load_from_s3 from activestorage.config import * from botocore.exceptions import EndpointConnectionError as botoExc @@ -81,6 +81,14 @@ def test_active(): init = active.__init__(uri=uri, ncvar=ncvar) +def test_activevariable(): + uri = "tests/test_data/cesm2_native.nc" + ncvar = "TREFHT" + active = Active(uri, ncvar=ncvar) + av = ActiveVariable(active) + assert av.ds.shape == (12, 4, 8) + + @pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") def test_lock(): """Unit test for class:Active.""" From 3b6f0ced784c1e75dfb79722337c5bca7709d5bb Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 14:47:53 +0000 Subject: [PATCH 03/30] set structure --- activestorage/active.py | 81 +++++++++++++++-------------------------- 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index ee5db132..56c439fa 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -4,10 +4,12 @@ import pathlib import urllib import pyfive +import s3fs import time -from pyfive.h5d import StoreInfo -import s3fs +from pathlib import Path +from pyfive.h5d import StoreInfo +from typing import Optional from activestorage.config import * from activestorage import reductionist @@ -47,21 +49,6 @@ def load_from_s3(uri, storage_options=None): print(f"Dataset loaded from S3 with s3fs and Pyfive: {uri} ({t2-t1:.2},{t3-t2:.2})") return ds -def _metricise(method): - """ Decorator for class methods loads into metric_data""" - def timed(self, *args, **kw): - ts = time.time() - metric_name='' - if '__metric_name' in kw: - metric_name = kw['__metric_name'] - del kw['__metric_name'] - result = method(self,*args, **kw) - te = time.time() - if metric_name: - self.metric_data[metric_name] = te-ts - return result - return timed - def get_missing_attributes(ds): """" @@ -122,13 +109,13 @@ def __new__(cls, *args, **kwargs): def __init__( self, - uri, - ncvar, - storage_type=None, - max_threads=100, - storage_options=None, - active_storage_url=None - ): + dataset: Optional[str | Path | object] , + ncvar: str, + storage_type: str = None, + max_threads: int = 100, + storage_options: dict = None, + active_storage_url: str = None + ) -> None: """ Instantiate with a NetCDF4 dataset URI and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that @@ -138,15 +125,21 @@ def __init__( :param storage_options: s3fs.S3FileSystem options :param active_storage_url: Reductionist server URL """ - # Assume NetCDF4 for now - self.uri = uri - if self.uri is None: - raise ValueError(f"Must use a valid file for uri. Got {uri}") + input_variable = False + if dataset is None: + raise ValueError(f"Must use a valid file or variable string for dataset. Got {dataset}") + if isinstance(dataset, Path) and not dataset.exists(): + raise ValueError(f"Path to input file {dataset} does not exist.") + if not isinstance(dataset, Path) and not isinstance(dataset, str): + print(f"Treating input {dataset} as variable object.") + input_variable = True + self.uri = dataset + # still allow for a passable storage_type # for special cases eg "special-POSIX" ie DDN if not storage_type and storage_options is not None: - storage_type = urllib.parse.urlparse(uri).scheme + storage_type = urllib.parse.urlparse(dataset).scheme self.storage_type = storage_type # get storage_options @@ -154,8 +147,9 @@ def __init__( self.active_storage_url = active_storage_url # basic check on file - if not os.path.isfile(self.uri) and not self.storage_type: - raise ValueError(f"Must use existing file for uri. {self.uri} not found") + if not input_variable: + if not os.path.isfile(self.uri) and not self.storage_type: + raise ValueError(f"Must use existing file for uri. {self.uri} not found") self.ncvar = ncvar if self.ncvar is None: @@ -201,8 +195,7 @@ def __getitem__(self, index): """ self.metric_data = {} if self.ds is None: - self.__load_nc_file(__metric_name='load nc time') - #self.__metricise('Load','__load_nc_file') + self.__load_nc_file() self.missing = self.__get_missing_attributes() @@ -211,21 +204,20 @@ def __getitem__(self, index): if self.method is None and self._version == 0: # No active operation - return self._get_vanilla(index, __metric_name='vanilla_time') + return self._get_vanilla(index) elif self._version == 1: #FIXME: is the difference between version 1 and 2 still honoured? - return self._get_selection(index, __metric_name='selection 1 time (s)') + return self._get_selection(index) elif self._version == 2: - return self._get_selection(index, __metric_name='selection 2 time (s)') + return self._get_selection(index) else: raise ValueError(f'Version {self._version} not supported') - @_metricise def _get_vanilla(self, index): """ Get the data without any active operation @@ -299,7 +291,7 @@ def _get_active(self, method, *args): """ raise NotImplementedError - @_metricise + def _get_selection(self, *args): """ At this point we have a Dataset object, but all the important information about @@ -571,16 +563,3 @@ def _mask_data(self, data): data = np.ma.masked_less(data, valid_min) return data - - -class ActiveVariable(): - """ - A netCDF4.Dataset-like variable built on top of the - Active class. It preserves all properties and methods - a regular netCDF4.Dataset has, but some of them are custom - built with Active Storage functionality in mind. - """ - def __init__(self, active): - self.ds = active._netCDF4Dataset() - - From bf0c3fd8613fc8132f855b541528ae524201cc9f Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 14:48:05 +0000 Subject: [PATCH 04/30] add test --- tests/unit/test_active.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 413ec962..fd73c473 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -3,18 +3,19 @@ import pytest import threading -from activestorage.active import Active, ActiveVariable +from activestorage.active import Active from activestorage.active import load_from_s3 from activestorage.config import * from botocore.exceptions import EndpointConnectionError as botoExc from botocore.exceptions import NoCredentialsError as NoCredsExc +from netCDF4 import Dataset def test_uri_none(): """Unit test for class:Active.""" # test invalid uri some_file = None - expected = "Must use a valid file for uri. Got None" + expected = "Must use a valid file or variable string for dataset. Got None" with pytest.raises(ValueError) as exc: active = Active(some_file, ncvar="") assert str(exc.value) == expected @@ -78,15 +79,16 @@ def test_active(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" active = Active(uri, ncvar=ncvar) - init = active.__init__(uri=uri, ncvar=ncvar) + init = active.__init__(dataset=uri, ncvar=ncvar) def test_activevariable(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" - active = Active(uri, ncvar=ncvar) - av = ActiveVariable(active) - assert av.ds.shape == (12, 4, 8) + ds = Dataset(uri) + av = Active(ds, ncvar) + av._method = "min" + assert av.method([3,444]) == 3 @pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") From 35a29d5cadd482e39d75ae41b13a414dfe10c04d Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 15:42:10 +0000 Subject: [PATCH 05/30] actual test with pyfive variable --- activestorage/active.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index 56c439fa..fb46b402 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -110,7 +110,7 @@ def __new__(cls, *args, **kwargs): def __init__( self, dataset: Optional[str | Path | object] , - ncvar: str, + ncvar: str = None, storage_type: str = None, max_threads: int = 100, storage_options: dict = None, @@ -152,7 +152,7 @@ def __init__( raise ValueError(f"Must use existing file for uri. {self.uri} not found") self.ncvar = ncvar - if self.ncvar is None: + if self.ncvar is None and not input_variable: raise ValueError("Must set a netCDF variable name to slice") self._version = 1 From 28d97e557a54f4c9ad96b9ab61ff31566c1fc076 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 15:42:18 +0000 Subject: [PATCH 06/30] actual test with pyfive variable --- tests/unit/test_active.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index fd73c473..7de4479c 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -1,5 +1,6 @@ import os import numpy as np +import pyfive import pytest import threading @@ -82,11 +83,20 @@ def test_active(): init = active.__init__(dataset=uri, ncvar=ncvar) -def test_activevariable(): +def test_activevariable_netCDF4(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" ds = Dataset(uri) - av = Active(ds, ncvar) + av = Active(ds[ncvar]) + av._method = "min" + assert av.method([3,444]) == 3 + + +def test_activevariable_pyfive(): + uri = "tests/test_data/cesm2_native.nc" + ncvar = "TREFHT" + ds = pyfive.File(uri) + av = Active(ds[ncvar]) av._method = "min" assert av.method([3,444]) == 3 From 25668bd50e01088dddfd2c822f23f0884ff8be11 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 16:18:13 +0000 Subject: [PATCH 07/30] clean up --- activestorage/active.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index fb46b402..440555b2 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -161,8 +161,6 @@ def __init__( self._max_threads = max_threads self.missing = None self.ds = None - self.netCDF4Dataset = None - self.metric_data = {} self.data_read = 0 def __load_nc_file(self): @@ -179,10 +177,6 @@ def __load_nc_file(self): self.ds = nc[ncvar] return self.ds - def _netCDF4Dataset(self): - if not self.netCDF4Dataset: - return self.__load_nc_file() - def __get_missing_attributes(self): if self.ds is None: self.__load_nc_file() @@ -193,7 +187,6 @@ def __getitem__(self, index): Provides support for a standard get item. #FIXME-BNL: Why is the argument index? """ - self.metric_data = {} if self.ds is None: self.__load_nc_file() @@ -307,10 +300,6 @@ def _get_selection(self, *args): # hopefully fix pyfive to get a dtype directly array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks) ds = self.ds.id - - self.metric_data['args'] = args - self.metric_data['dataset shape'] = self.ds.shape - self.metric_data['dataset chunks'] = self.ds.chunks if ds.filter_pipeline is None: compressor, filters = None, None else: @@ -359,13 +348,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f # Because we do this, we need to read the dataset b-tree now, not as we go, so # it is already in cache. If we remove the thread pool from here, we probably # wouldn't need to do it before the first one. - - if ds.chunks is not None: - t1 = time.time() - # ds._get_chunk_addresses() - t2 = time.time() - t1 - self.metric_data['indexing time (s)'] = t2 - # self.metric_data['chunk number'] = len(ds._zchunk_index) chunk_count = 0 t1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: @@ -430,10 +412,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f # size. out = out / np.sum(counts).reshape(shape1) - t2 = time.time() - self.metric_data['reduction time (s)'] = t2-t1 - self.metric_data['chunks processed'] = chunk_count - self.metric_data['storage read (B)'] = self.data_read return out def _get_endpoint_url(self): From 8cdd4cbc9bbb5a36a68711d4fa3696292384696c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 17:23:31 +0000 Subject: [PATCH 08/30] clean up and add bits --- activestorage/active.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index 440555b2..6d971511 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -125,6 +125,7 @@ def __init__( :param storage_options: s3fs.S3FileSystem options :param active_storage_url: Reductionist server URL """ + self.ds = None input_variable = False if dataset is None: raise ValueError(f"Must use a valid file or variable string for dataset. Got {dataset}") @@ -133,6 +134,8 @@ def __init__( if not isinstance(dataset, Path) and not isinstance(dataset, str): print(f"Treating input {dataset} as variable object.") input_variable = True + self.ds = dataset + self.filename = None self.uri = dataset @@ -160,11 +163,10 @@ def __init__( self._method = None self._max_threads = max_threads self.missing = None - self.ds = None self.data_read = 0 def __load_nc_file(self): - """ Get the netcdf file and it's b-tree""" + """ Get the netcdf file and its b-tree""" ncvar = self.ncvar # in all cases we need an open netcdf file to get at attributes # we keep it open because we need it's b-tree @@ -173,8 +175,8 @@ def __load_nc_file(self): elif self.storage_type == "s3": nc = load_from_s3(self.uri, self.storage_options) self.filename = self.uri - self.ds = nc[ncvar] + return self.ds def __get_missing_attributes(self): From a2f41ab21c2564651b02cd50827e9c3e14e23760 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Wed, 26 Feb 2025 17:23:47 +0000 Subject: [PATCH 09/30] add chunking test case --- tests/unit/test_active.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 7de4479c..f44ee605 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -86,8 +86,8 @@ def test_active(): def test_activevariable_netCDF4(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" - ds = Dataset(uri) - av = Active(ds[ncvar]) + ds = Dataset(uri)[ncvar] + av = Active(ds) av._method = "min" assert av.method([3,444]) == 3 @@ -95,10 +95,11 @@ def test_activevariable_netCDF4(): def test_activevariable_pyfive(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" - ds = pyfive.File(uri) - av = Active(ds[ncvar]) + ds = pyfive.File(uri)[ncvar] + av = Active(ds) av._method = "min" assert av.method([3,444]) == 3 + assert av[3:5] == 3 @pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") From e471dee0b3ceb3ad63874f248235e69121944699 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 13:00:50 +0000 Subject: [PATCH 10/30] add exception if not pyfive dataset --- activestorage/active.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 6d971511..0b48790f 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -133,9 +133,11 @@ def __init__( raise ValueError(f"Path to input file {dataset} does not exist.") if not isinstance(dataset, Path) and not isinstance(dataset, str): print(f"Treating input {dataset} as variable object.") + if not type(dataset) is pyfive.high_level.Dataset: + raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset}") input_variable = True self.ds = dataset - self.filename = None + self.filename = self.ds self.uri = dataset From 620e2b68bd5958d45aaf1a667136bfde34dcaa61 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 13:01:06 +0000 Subject: [PATCH 11/30] test for that exception --- tests/unit/test_active.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index f44ee605..0175389c 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -87,9 +87,10 @@ def test_activevariable_netCDF4(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" ds = Dataset(uri)[ncvar] - av = Active(ds) - av._method = "min" - assert av.method([3,444]) == 3 + exc_str = "Variable object dataset can only be pyfive.high_level.Dataset" + with pytest.raises(TypeError) as exc: + av = Active(ds) + assert exc_str in str(exc) def test_activevariable_pyfive(): From dca443f5c41c62dc70b29c16e5613e3489ee603c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 13:15:07 +0000 Subject: [PATCH 12/30] start reduce chunk with correct syntax --- activestorage/storage.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/activestorage/storage.py b/activestorage/storage.py index 80a575ba..64682073 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -27,6 +27,8 @@ def reduce_chunk(rfile, """ #FIXME: for the moment, open the file every time ... we might want to do that, or not + obj_type = type(rfile) + print(f"Reducing chunk of object {obj_type}") with open(rfile,'rb') as open_file: # get the data chunk = read_block(open_file, offset, size) From 1bd40f5eeef36fa417cea6eea6213fa5ca4771b5 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 14:06:49 +0000 Subject: [PATCH 13/30] it bloody works --- activestorage/storage.py | 42 ++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/activestorage/storage.py b/activestorage/storage.py index 64682073..fcb3e81a 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -1,7 +1,10 @@ """Active storage module.""" import numpy as np +import pyfive from numcodecs.compat import ensure_ndarray +from pyfive.dataobjects import DatasetID + def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, @@ -29,18 +32,33 @@ def reduce_chunk(rfile, #FIXME: for the moment, open the file every time ... we might want to do that, or not obj_type = type(rfile) print(f"Reducing chunk of object {obj_type}") - with open(rfile,'rb') as open_file: - # get the data - chunk = read_block(open_file, offset, size) - # reverse any compression and filters - chunk = filter_pipeline(chunk, compression, filters) - # make it a numpy array of bytes - chunk = ensure_ndarray(chunk) - # convert to the appropriate data type - chunk = chunk.view(dtype) - # sort out ordering and convert to the parent hyperslab dimensions - chunk = chunk.reshape(-1, order='A') - chunk = chunk.reshape(shape, order=order) + if not obj_type is pyfive.high_level.Dataset: + with open(rfile,'rb') as open_file: + # get the data + chunk = read_block(open_file, offset, size) + # reverse any compression and filters + chunk = filter_pipeline(chunk, compression, filters) + # make it a numpy array of bytes + chunk = ensure_ndarray(chunk) + # convert to the appropriate data type + chunk = chunk.view(dtype) + # sort out ordering and convert to the parent hyperslab dimensions + chunk = chunk.reshape(-1, order='A') + chunk = chunk.reshape(shape, order=order) + else: + class storeinfo: pass + storeinfo.byte_offset = offset + storeinfo.size = size + chunk = rfile.id._get_raw_chunk(storeinfo) + # reverse any compression and filters + chunk = filter_pipeline(chunk, compression, filters) + # make it a numpy array of bytes + chunk = ensure_ndarray(chunk) + # convert to the appropriate data type + chunk = chunk.view(dtype) + # sort out ordering and convert to the parent hyperslab dimensions + chunk = chunk.reshape(-1, order='A') + chunk = chunk.reshape(shape, order=order) tmp = chunk[chunk_selection] if method: From 0bdc3106909a3c4c00b0880e29df33f74414eae7 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 14:06:56 +0000 Subject: [PATCH 14/30] it bloody works --- tests/unit/test_active.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 0175389c..9d602e8d 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -100,7 +100,11 @@ def test_activevariable_pyfive(): av = Active(ds) av._method = "min" assert av.method([3,444]) == 3 - assert av[3:5] == 3 + av_slice_min = av[3:5] + assert av_slice_min == np.array(258.62814, dtype="float32") + # test with Numpy + np_slice_min = np.min(ds[3:5]) + assert av_slice_min == np_slice_min @pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") From 9f46ab9beb2522e762cbb7b7b680974464c3aa42 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 14:17:32 +0000 Subject: [PATCH 15/30] add inline comment --- activestorage/storage.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/activestorage/storage.py b/activestorage/storage.py index fcb3e81a..6e72a488 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -28,11 +28,13 @@ def reduce_chunk(rfile, storage implementations we'll change to controlled vocabulary) """ - - #FIXME: for the moment, open the file every time ... we might want to do that, or not obj_type = type(rfile) print(f"Reducing chunk of object {obj_type}") + if not obj_type is pyfive.high_level.Dataset: + #FIXME: for the moment, open the file every time ... we might want to do that, or not + # we could just use an instance of pyfive.high_level.Dataset.id + # passed directly from active.py, as below with open(rfile,'rb') as open_file: # get the data chunk = read_block(open_file, offset, size) From 0c562e117f216bf30cd528fd25a1e79586875450 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:10:10 +0000 Subject: [PATCH 16/30] correct handling for s3 --- activestorage/active.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/activestorage/active.py b/activestorage/active.py index 0b48790f..76c0230a 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -147,6 +147,12 @@ def __init__( storage_type = urllib.parse.urlparse(dataset).scheme self.storage_type = storage_type + # set correct filename attr + if input_variable and not self.storage_type: + self.filename = self.ds + elif input_variable and self.storage_type == "s3": + self.filename = self.ds.id._filename + # get storage_options self.storage_options = storage_options self.active_storage_url = active_storage_url From 74f0c26ff0af9c91d015553d15a66a97a63ac748 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:10:21 +0000 Subject: [PATCH 17/30] run s3 tests --- .github/workflows/test_s3_minio.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index cceb63d9..1232d768 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -6,7 +6,8 @@ on: push: branches: - main # keep this at all times - - pyfive + # - pyfive # reinstate + - new_api_pyfive pull_request: schedule: - cron: '0 0 * * *' # nightly From 51e27ca616936e3ca4b60d95a936c45a404b1c70 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:22:33 +0000 Subject: [PATCH 18/30] add real world s3 dataset test --- tests/test_real_s3.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 tests/test_real_s3.py diff --git a/tests/test_real_s3.py b/tests/test_real_s3.py new file mode 100644 index 00000000..630d4c49 --- /dev/null +++ b/tests/test_real_s3.py @@ -0,0 +1,42 @@ +import os +import numpy as np + +from activestorage.active import Active +from activestorage.active import load_from_s3 + +S3_BUCKET = "bnl" + + +def test_s3_dataset(): + """Run somewhat as the 'gold' test.""" + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # old proxy + # 'client_kwargs': {'endpoint_url': "https://uor-aces-o.ext.proxy.jc.rl.ac.uk"}, # new proxy + } + active_storage_url = "https://192.171.169.113:8080" + # bigger_file = "ch330a.pc19790301-bnl.nc" # 18GB 3400 HDF5 chunks + bigger_file = "ch330a.pc19790301-def.nc" # 17GB 64 HDF5 chunks + # bigger_file = "da193a_25_day__198808-198808.nc" # 3GB 30 HDF5 chunks + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + dataset = load_from_s3(test_file_uri, storage_options=storage_options) + av = dataset['UM_m01s16i202_vn1106'] + + # big file bnl: 18GB/3400 HDF5 chunks; def: 17GB/64 HDF5 chunks + active = Active(av, storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + active._version = 2 + active._method = "min" + + # result = active[:] + result = active[0:3, 4:6, 7:9] # standardized slice + + print("Result is", result) + assert result == 5098.625 From 2499066ac20fa11d9b4d6a1096b93c25c6d3604b Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:29:11 +0000 Subject: [PATCH 19/30] add note to test --- tests/test_real_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_real_s3.py b/tests/test_real_s3.py index 630d4c49..2a3f0d50 100644 --- a/tests/test_real_s3.py +++ b/tests/test_real_s3.py @@ -6,7 +6,7 @@ S3_BUCKET = "bnl" - +# TODO Remove after full testing and right before deployment def test_s3_dataset(): """Run somewhat as the 'gold' test.""" storage_options = { From c4950196309aae0316dad75e81ef64a2732357cc Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:29:24 +0000 Subject: [PATCH 20/30] remove leftover --- activestorage/active.py | 1 - 1 file changed, 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 76c0230a..a8e58457 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -137,7 +137,6 @@ def __init__( raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset}") input_variable = True self.ds = dataset - self.filename = self.ds self.uri = dataset From ad3fb546bfce5a806631188408c08d2043dae2fa Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:51:09 +0000 Subject: [PATCH 21/30] unused import --- activestorage/storage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/activestorage/storage.py b/activestorage/storage.py index 6e72a488..56a1ff89 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -3,7 +3,6 @@ import pyfive from numcodecs.compat import ensure_ndarray -from pyfive.dataobjects import DatasetID def reduce_chunk(rfile, From 9bf31bd695208cbd941c351d265723ed0368818e Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Thu, 27 Feb 2025 15:51:22 +0000 Subject: [PATCH 22/30] unused return --- activestorage/active.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index a8e58457..43a54c21 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -184,8 +184,6 @@ def __load_nc_file(self): self.filename = self.uri self.ds = nc[ncvar] - return self.ds - def __get_missing_attributes(self): if self.ds is None: self.__load_nc_file() From 61cf5dd31888519f78c07bbe4e948fb074c0310b Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:07:09 +0000 Subject: [PATCH 23/30] add correct function docstring --- activestorage/active.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index 43a54c21..ae865314 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -173,10 +173,16 @@ def __init__( self.data_read = 0 def __load_nc_file(self): - """ Get the netcdf file and its b-tree""" + """ + Get the netcdf file and its b-tree. + + This private method is used only if the input to Active + is not a pyfive.high_level.Dataset object. In that case, + any file opening is skipped, and ncvar is not used. The + Dataset object will have already contained the b-tree, + and `_filename` attribute. + """ ncvar = self.ncvar - # in all cases we need an open netcdf file to get at attributes - # we keep it open because we need it's b-tree if self.storage_type is None: nc = pyfive.File(self.uri) elif self.storage_type == "s3": From 4cdeb1b17ec8bbe116d261bc0f603a040e2cb22c Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:09:02 +0000 Subject: [PATCH 24/30] removed obsolete inline --- activestorage/active.py | 1 - 1 file changed, 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index ae865314..31546b06 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -310,7 +310,6 @@ def _get_selection(self, *args): name = self.ds.name dtype = np.dtype(self.ds.dtype) - # hopefully fix pyfive to get a dtype directly array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks) ds = self.ds.id if ds.filter_pipeline is None: From 9aca259ab2410c983801728e0db8567a242d42d2 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:11:19 +0000 Subject: [PATCH 25/30] remove obsolete inline --- activestorage/active.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/activestorage/active.py b/activestorage/active.py index 31546b06..e7d5c6a4 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -357,9 +357,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f session = None # Process storage chunks using a thread pool. - # Because we do this, we need to read the dataset b-tree now, not as we go, so - # it is already in cache. If we remove the thread pool from here, we probably - # wouldn't need to do it before the first one. chunk_count = 0 t1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: From 2507fe4bc78ed2d47b64c82f871bc4b86e4a4467 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:36:41 +0000 Subject: [PATCH 26/30] Update activestorage/active.py Co-authored-by: David Hassell --- activestorage/active.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index e7d5c6a4..1cf803a4 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -128,7 +128,7 @@ def __init__( self.ds = None input_variable = False if dataset is None: - raise ValueError(f"Must use a valid file or variable string for dataset. Got {dataset}") + raise ValueError(f"Must use a valid file name or variable object for dataset. Got {dataset!r}") if isinstance(dataset, Path) and not dataset.exists(): raise ValueError(f"Path to input file {dataset} does not exist.") if not isinstance(dataset, Path) and not isinstance(dataset, str): From 17684aa10062bafb0831609dd2ce3dd8402d9562 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:37:19 +0000 Subject: [PATCH 27/30] Update activestorage/active.py Co-authored-by: David Hassell --- activestorage/active.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 1cf803a4..977041a4 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -134,7 +134,7 @@ def __init__( if not isinstance(dataset, Path) and not isinstance(dataset, str): print(f"Treating input {dataset} as variable object.") if not type(dataset) is pyfive.high_level.Dataset: - raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset}") + raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset!r}") input_variable = True self.ds = dataset self.uri = dataset From b80862d805916ef20e3c7b7db29cdbb65edf5c97 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:37:43 +0000 Subject: [PATCH 28/30] Update activestorage/active.py Co-authored-by: David Hassell --- activestorage/active.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/activestorage/active.py b/activestorage/active.py index 977041a4..734321fd 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -130,7 +130,7 @@ def __init__( if dataset is None: raise ValueError(f"Must use a valid file name or variable object for dataset. Got {dataset!r}") if isinstance(dataset, Path) and not dataset.exists(): - raise ValueError(f"Path to input file {dataset} does not exist.") + raise ValueError(f"Path to input file {dataset!r} does not exist.") if not isinstance(dataset, Path) and not isinstance(dataset, str): print(f"Treating input {dataset} as variable object.") if not type(dataset) is pyfive.high_level.Dataset: From ebd54e8a6e1aba511c0f37dc39d40fffd36fb4f7 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 12:44:05 +0000 Subject: [PATCH 29/30] fix test --- tests/unit/test_active.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 9d602e8d..1e6fd805 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -16,7 +16,7 @@ def test_uri_none(): """Unit test for class:Active.""" # test invalid uri some_file = None - expected = "Must use a valid file or variable string for dataset. Got None" + expected = "Must use a valid file name or variable object for dataset. Got None" with pytest.raises(ValueError) as exc: active = Active(some_file, ncvar="") assert str(exc.value) == expected From 050bc8fdc6b031ecc6fa3010e9dbac6ab3b3ea56 Mon Sep 17 00:00:00 2001 From: Valeriu Predoi Date: Fri, 28 Feb 2025 13:11:00 +0000 Subject: [PATCH 30/30] test mock s3 dataset --- tests/unit/test_mock_s3.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_mock_s3.py b/tests/unit/test_mock_s3.py index 63adf8d0..9aea9a43 100644 --- a/tests/unit/test_mock_s3.py +++ b/tests/unit/test_mock_s3.py @@ -1,11 +1,13 @@ import os import s3fs import pathlib +import pyfive import pytest import h5netcdf +import numpy as np from tempfile import NamedTemporaryFile -from activestorage.active import load_from_s3 +from activestorage.active import load_from_s3, Active # needed by the spoofed s3 filesystem @@ -133,7 +135,7 @@ def test_s3file_with_s3fs(s3fs_s3): anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} ) - # test load by h5netcdf + # test load by standard h5netcdf with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f: print("File path", f.path) ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True) @@ -141,9 +143,21 @@ def test_s3file_with_s3fs(s3fs_s3): print(ncfile["ta"]) assert "ta" in ncfile - # test Active + # test active.load_from_s3 storage_options = dict(anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri}) with load_from_s3(os.path.join("MY_BUCKET", file_name), storage_options) as ac_file: print(ac_file) assert "ta" in ac_file + + # test loading with Pyfive and passing the Dataset to Active + with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f: + print("File path", f.path) + pie_ds = pyfive.File(f, 'r') + print("File loaded from spoof S3 with Pyfive:", pie_ds) + print("Pyfive dataset:", pie_ds["ta"]) + av = Active(pie_ds["ta"]) + av._method = "min" + assert av.method([3,444]) == 3 + av_slice_min = av[3:5] + assert av_slice_min == np.array(249.6583, dtype="float32")