diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index 11071de6..c1399441 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -69,6 +69,7 @@ jobs: - name: Run S3 exploratory tests run: | pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html + if: always() - name: Install pytest-monitor run: pip install pytest-monitor - name: Run S3 performance tests diff --git a/.github/workflows/test_s3_remote_reductionist.yml b/.github/workflows/test_s3_remote_reductionist.yml new file mode 100644 index 00000000..438703c0 --- /dev/null +++ b/.github/workflows/test_s3_remote_reductionist.yml @@ -0,0 +1,68 @@ +# adapted GA workflow from https://github.com/stackhpc/reductionist-rs +# This runs Active with a remote Reductionist and S3 data stored elsewhere +--- +name: S3/Remote Reductionist + +on: + push: + branches: + - main # keep this at all times + pull_request: + schedule: + - cron: '0 0 * * *' # nightly + +# Required shell entrypoint to have properly configured bash shell +defaults: + run: + shell: bash -l {0} + +jobs: + linux-test: + runs-on: "ubuntu-latest" + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + fail-fast: false + name: Linux Python ${{ matrix.python-version }} + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - uses: conda-incubator/setup-miniconda@v2 + with: + python-version: ${{ matrix.python-version }} + miniforge-version: "latest" + miniforge-variant: Mambaforge + use-mamba: true + - name: Get conda and Python versions + run: | + conda --version + python -V + - name: Export proxy + run: | + echo 'USE_S3 = True' >> activestorage/config.py + - name: Ping remote Reductionist + run: curl -k https://192.171.169.248:8080/.well-known/reductionist-schema + - uses: conda-incubator/setup-miniconda@v2 + with: + activate-environment: activestorage-minio + environment-file: environment.yml + python-version: ${{ matrix.python-version }} + miniforge-version: "latest" + miniforge-variant: Mambaforge + use-mamba: true + - name: Install PyActiveStorage + run: | + conda --version + python -V + which python + pip install -e . + - name: Run one single test + run: | + pytest tests/test_compression_remote_reductionist.py + - name: Upload HTML report artifact + uses: actions/upload-artifact@v3 + with: + name: html-report + path: test-reports/ + if: always() diff --git a/activestorage/active.py b/activestorage/active.py index 841c3a60..2bf27ef4 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -20,7 +20,7 @@ @contextlib.contextmanager -def load_from_s3(uri): +def load_from_s3(uri, storage_options=None): """ Load a netCDF4-like object from S3. @@ -34,10 +34,15 @@ def load_from_s3(uri): '' instead, we use h5netcdf: https://github.com/h5netcdf/h5netcdf a Python binder straight to HDF5-netCDF4 interface, that doesn't need a "local" file + + storage_options: kwarg dict containing S3 credentials passed straight to Active """ - fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio - secret=S3_SECRET_KEY, # eg "minioadmin" for Minio - client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio + if storage_options is None: # use pre-configured S3 credentials + fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio + secret=S3_SECRET_KEY, # eg "minioadmin" for Minio + client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio + else: + fs = s3fs.S3FileSystem(**storage_options) # use passed-in dictionary with fs.open(uri, 'rb') as s3file: ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True) print(f"Dataset loaded from S3 via h5netcdf: {ds}") @@ -68,20 +73,43 @@ def __new__(cls, *args, **kwargs): } return instance - def __init__(self, uri, ncvar, storage_type=None, max_threads=100): + def __init__( + self, + uri, + ncvar, + storage_type=None, + max_threads=100, + storage_options=None, + active_storage_url=None + ): """ Instantiate with a NetCDF4 dataset and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that file, however, if that information is available at instantiation, it can be provided using keywords and avoid a metadata read.) + + :param storage_options: s3fs.S3FileSystem options + :param active_storage_url: Reductionist server URL """ # Assume NetCDF4 for now self.uri = uri if self.uri is None: raise ValueError(f"Must use a valid file for uri. Got {self.uri}") + + # still allow for a passable storage_type + # for special cases eg "special-POSIX" ie DDN + if not storage_type and storage_options is not None: + storage_type = urllib.parse.urlparse(uri).scheme self.storage_type = storage_type + + # get storage_options + self.storage_options = storage_options + self.active_storage_url = active_storage_url + + # basic check on file if not os.path.isfile(self.uri) and not self.storage_type: raise ValueError(f"Must use existing file for uri. {self.uri} not found") + self.ncvar = ncvar if self.ncvar is None: raise ValueError("Must set a netCDF variable name to slice") @@ -107,13 +135,13 @@ def __getitem__(self, index): lock = self.lock if lock: lock.acquire() - + if self.storage_type is None: nc = Dataset(self.uri) data = nc[ncvar][index] nc.close() elif self.storage_type == "s3": - with load_from_s3(self.uri) as nc: + with load_from_s3(self.uri, self.storage_options) as nc: data = nc[ncvar][index] data = self._mask_data(data, nc[ncvar]) @@ -238,7 +266,8 @@ def _via_kerchunk(self, index): ds, zarray, zattrs = nz.load_netcdf_zarr_generic( self.uri, self.ncvar, - self.storage_type + self.storage_type, + self.storage_options, ) # The following is a hangove from exploration # and is needed if using the original doing it ourselves @@ -390,6 +419,20 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype, return out + def _get_endpoint_url(self): + """Return the endpoint_url of an S3 object store, or `None`""" + endpoint_url = self.storage_options.get('endpoint_url') + if endpoint_url is not None: + return endpoint_url + + client_kwargs = self.storage_options.get('client_kwargs') + if client_kwargs: + endpoint_url = client_kwargs.get('endpoint_url') + if endpoint_url is not None: + return endpoint_url + + return f"http://{urllib.parse.urlparse(self.filename).netloc}" + def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, out_selection, compressor, filters, missing, drop_axes=None): @@ -406,6 +449,7 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, key = f"{self.ncvar}/{coord}" rfile, offset, size = tuple(fsref[key]) + # S3: pass in pre-configured storage options (credentials) if self.storage_type == "s3": parsed_url = urllib.parse.urlparse(rfile) bucket = parsed_url.netloc @@ -413,15 +457,36 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts, # FIXME: We do not get the correct byte order on the Zarr Array's dtype # when using S3, so use the value captured earlier. dtype = self._dtype - tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL, - S3_URL, - bucket, object, offset, - size, compressor, filters, - missing, dtype, - self.zds._chunks, - self.zds._order, - chunk_selection, - operation=self._method) + if self.storage_options is None: + tmp, count = reductionist.reduce_chunk(session, + S3_ACTIVE_STORAGE_URL, + S3_URL, + bucket, object, offset, + size, compressor, filters, + missing, dtype, + self.zds._chunks, + self.zds._order, + chunk_selection, + operation=self._method) + else: + # special case for "anon=True" buckets that work only with e.g. + # fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL}) + # where file uri = bucketX/fileY.mc + print("S3 Storage options to Reductionist:", self.storage_options) + if self.storage_options.get("anon", None) == True: + bucket = os.path.dirname(parsed_url.path) # bucketX + object = os.path.basename(parsed_url.path) # fileY + print("S3 anon=True Bucket and File:", bucket, object) + tmp, count = reductionist.reduce_chunk(session, + self.active_storage_url, + self._get_endpoint_url(), + bucket, object, offset, + size, compressor, filters, + missing, dtype, + self.zds._chunks, + self.zds._order, + chunk_selection, + operation=self._method) else: # note there is an ongoing discussion about this interface, and what it returns # see https://github.com/valeriupredoi/PyActiveStorage/issues/33 diff --git a/activestorage/netcdf_to_zarr.py b/activestorage/netcdf_to_zarr.py index 782ff729..4bd0bb82 100644 --- a/activestorage/netcdf_to_zarr.py +++ b/activestorage/netcdf_to_zarr.py @@ -10,9 +10,10 @@ from kerchunk.hdf import SingleHdf5ToZarr -def gen_json(file_url, varname, outf, storage_type): +def gen_json(file_url, varname, outf, storage_type, storage_options): """Generate a json file that contains the kerchunk-ed data for Zarr.""" - if storage_type == "s3": + # S3 configuration presets + if storage_type == "s3" and storage_options is None: fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, secret=S3_SECRET_KEY, client_kwargs={'endpoint_url': S3_URL}, @@ -26,6 +27,21 @@ def gen_json(file_url, varname, outf, storage_type): with fs2.open(outf, 'wb') as f: content = h5chunks.translate() f.write(ujson.dumps(content).encode()) + + # S3 passed-in configuration + elif storage_type == "s3" and storage_options is not None: + storage_options = storage_options.copy() + storage_options['default_fill_cache'] = False + storage_options['default_cache_type'] = "none" + fs = s3fs.S3FileSystem(**storage_options) + fs2 = fsspec.filesystem('') + with fs.open(file_url, 'rb') as s3file: + h5chunks = SingleHdf5ToZarr(s3file, file_url, + inline_threshold=0) + with fs2.open(outf, 'wb') as f: + content = h5chunks.translate() + f.write(ujson.dumps(content).encode()) + # not S3 else: fs = fsspec.filesystem('') with fs.open(file_url, 'rb') as local_file: @@ -77,13 +93,17 @@ def open_zarr_group(out_json, varname): return zarr_array -def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True): +def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, build_dummy=True): """Pass a netCDF4 file to be shaped as Zarr file by kerchunk.""" print(f"Storage type {storage_type}") # Write the Zarr group JSON to a temporary file. with tempfile.NamedTemporaryFile() as out_json: - _, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type) + _, zarray, zattrs = gen_json(fileloc, + varname, + out_json.name, + storage_type, + storage_options) # open this monster print(f"Attempting to open and convert {fileloc}.") diff --git a/activestorage/reductionist.py b/activestorage/reductionist.py index 039dd4fe..654b54ba 100644 --- a/activestorage/reductionist.py +++ b/activestorage/reductionist.py @@ -19,7 +19,7 @@ def get_session(username: str, password: str, cacert: typing.Optional[str]) -> r """ session = requests.Session() session.auth = (username, password) - session.verify = cacert or True + session.verify = cacert or False return session @@ -53,6 +53,7 @@ def reduce_chunk(session, server, source, bucket, object, """ request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection) + print("Reductionist request data dictionary:", request_data) api_operation = "sum" if operation == "mean" else operation or "select" url = f'{server}/v1/{api_operation}/' response = request(session, url, request_data) diff --git a/tests/test_compression.py b/tests/test_compression.py index ae1b8b79..1cba14b1 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -40,6 +40,27 @@ def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool): return test_data +STORAGE_OPTIONS_CLASSIC = { + 'key': S3_ACCESS_KEY, + 'secret': S3_SECRET_KEY, + 'client_kwargs': {'endpoint_url': S3_URL}, +} +S3_ACTIVE_URL_MINIO = S3_ACTIVE_STORAGE_URL +S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080" + +# TODO include all supported configuration types +# so far test three possible configurations for storage_options: +# - storage_options = None, active_storage_url = None (Minio and local Reductionist, preset credentials from config.py) +# - storage_options = CLASSIC, active_storage_url = CLASSIC (Minio and local Reductionist, preset credentials from config.py but folded in storage_options and active_storage_url) +# - storage_options = CLASSIC, active_storage_url = Bryan's machine (Minio BUT Reductionist moved on Bryan's machine) +# (this invariably fails due to data URL being //localhost:9000 closed to outside Reductionist +storage_options_paramlist = [ + (None, None), + (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_MINIO), +# (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_Bryan) +] + + @pytest.mark.parametrize('compression', ['zlib']) @pytest.mark.parametrize('shuffle', [False, True]) def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool): @@ -55,7 +76,8 @@ def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool) assert result == 740.0 -def test_compression_and_filters_cmip6_data(): +@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) +def test_compression_and_filters_cmip6_data(storage_options, active_storage_url): """ Test use of datasets with compression and filters applied for a real CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas). @@ -69,7 +91,9 @@ def test_compression_and_filters_cmip6_data(): check_dataset_filters(test_file, "tas", "zlib", False) - active = Active(test_file, 'tas', utils.get_storage_type()) + active = Active(test_file, 'tas', utils.get_storage_type(), + storage_options=storage_options, + active_storage_url=active_storage_url) active._version = 1 active._method = "min" result = active[0:2,4:6,7:9] @@ -77,7 +101,8 @@ def test_compression_and_filters_cmip6_data(): assert result == 239.25946044921875 -def test_compression_and_filters_obs4mips_data(): +@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) +def test_compression_and_filters_obs4mips_data(storage_options, active_storage_url): """ Test use of datasets with compression and filters applied for a real obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard @@ -92,7 +117,9 @@ def test_compression_and_filters_obs4mips_data(): check_dataset_filters(test_file, "rlut", "zlib", False) - active = Active(test_file, 'rlut', utils.get_storage_type()) + active = Active(test_file, 'rlut', utils.get_storage_type(), + storage_options=storage_options, + active_storage_url=active_storage_url) active._version = 1 active._method = "min" result = active[0:2,4:6,7:9] diff --git a/tests/test_compression_remote_reductionist.py b/tests/test_compression_remote_reductionist.py new file mode 100644 index 00000000..321c6cef --- /dev/null +++ b/tests/test_compression_remote_reductionist.py @@ -0,0 +1,109 @@ +import os +import numpy as np +import pytest + +from netCDF4 import Dataset +from pathlib import Path + +from activestorage.active import Active, load_from_s3 +from activestorage.config import * +from activestorage.dummy_data import make_compressed_ncdata +from activestorage.reductionist import ReductionistError as RedErr + +import utils + + +# Bryan's S3 machine + Bryan's reductionist +STORAGE_OPTIONS_Bryan = { + 'anon': True, + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, +} +S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080" +# TODO include all supported configuration types +storage_options_paramlist = [ + (STORAGE_OPTIONS_Bryan, S3_ACTIVE_URL_Bryan) +] +# bucket needed too for this test only +# otherwise, bucket is extracted automatically from full file uri +S3_BUCKET = "bnl" + + +@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) +def test_compression_and_filters_cmip6_data(storage_options, active_storage_url): + """ + Test use of datasets with compression and filters applied for a real + CMIP6 dataset (CMIP6-test.nc) - an IPSL file. + + This test will always pass when USE_S3 = False; equally, it will always + fail if USE_S3 = True until Reductionist supports anon=True S3 buckets. + See following test below with a forced storage_type="s3" that mimicks + locally the fail, and catches it. Equally, we catch the same exception when USE_S3=True + + Important info on session data: + S3 Storage options to Reductionist: {'anon': True, 'client_kwargs': {'endpoint_url': 'https://uor-aces-o.s3-ext.jc.rl.ac.uk'}} + S3 anon=True Bucket and File: bnl CMIP6-test.nc + Reductionist request data dictionary: {'source': 'https://uor-aces-o.s3-ext.jc.rl.ac.uk', 'bucket': 'bnl', 'object': 'CMIP6-test.nc', 'dtype': 'float32', 'byte_order': 'little', 'offset': 29385, 'size': 942518, 'order': 'C', 'shape': (15, 143, 144), 'selection': [[0, 2, 1], [4, 6, 1], [7, 9, 1]], 'compression': {'id': 'zlib'}} + """ + test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') + with Dataset(test_file) as nc_data: + nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) + print(f"Numpy min from compressed file {nc_min}") + + # TODO remember that the special case for "anon=True" buckets is that + # the actual file uri = "bucket/filename" + if USE_S3: + ofile = os.path.basename(test_file) + test_file_uri = os.path.join(S3_BUCKET, ofile) + else: + test_file_uri = test_file + active = Active(test_file_uri, 'tas', utils.get_storage_type(), + storage_options=storage_options, + active_storage_url=active_storage_url) + active._version = 1 + active._method = "min" + + if USE_S3: + # for now anon=True S3 buckets are not supported by Reductionist + with pytest.raises(RedErr) as rederr: + result = active[0:2,4:6,7:9] + access_denied_err = 'code: \\"AccessDenied\\"' + assert access_denied_err in str(rederr.value) + # assert nc_min == result + # assert result == 239.25946044921875 + else: + result = active[0:2,4:6,7:9] + assert nc_min == result + assert result == 239.25946044921875 + + +@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist) +def test_compression_and_filters_cmip6_forced_s3_from_local(storage_options, active_storage_url): + """ + Test use of datasets with compression and filters applied for a real + CMIP6 dataset (CMIP6-test.nc) - an IPSL file. + + This is for a special anon=True bucket ONLY. + """ + test_file = str(Path(__file__).resolve().parent / 'test_data' / 'CMIP6-test.nc') + with Dataset(test_file) as nc_data: + nc_min = np.min(nc_data["tas"][0:2,4:6,7:9]) + print(f"Numpy min from compressed file {nc_min}") + + # TODO remember that the special case for "anon=True" buckets is that + # the actual file uri = "bucket/filename" + ofile = os.path.basename(test_file) + test_file_uri = os.path.join(S3_BUCKET, ofile) + active = Active(test_file_uri, 'tas', storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + + active._version = 1 + active._method = "min" + + # for now anon=True S3 buckets are not supported by Reductionist + with pytest.raises(RedErr) as rederr: + result = active[0:2,4:6,7:9] + access_denied_err = 'code: \\"AccessDenied\\"' + assert access_denied_err in str(rederr.value) + # assert nc_min == result + # assert result == 239.25946044921875 diff --git a/tests/test_data/CMIP6-test.nc b/tests/test_data/CMIP6-test.nc new file mode 100644 index 00000000..fcbb8e4f Binary files /dev/null and b/tests/test_data/CMIP6-test.nc differ diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 232ffac9..2f95fdee 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -7,6 +7,7 @@ from activestorage.active import load_from_s3 from activestorage.config import * from botocore.exceptions import EndpointConnectionError as botoExc +from botocore.exceptions import NoCredentialsError as NoCredsExc def test_uri_none(): @@ -101,7 +102,7 @@ def test_lock(): assert active.lock is False -@pytest.mark.skipif(USE_S3 = True, reason="it will look for silly bucket") +@pytest.mark.skipif(USE_S3 == True, reason="it will look for silly bucket") def test_load_from_s3(): """Test basic load from S3 without loading from S3.""" uri = "s3://bucket/file.nc" @@ -110,3 +111,29 @@ def test_load_from_s3(): with load_from_s3(uri) as nc: data = nc["cow"][0] assert expected_exc in str(exc.value) + + +@pytest.mark.skipif(USE_S3 == True, reason="it will look for silly bucket") +def test_load_from_s3_so_None(): + """Test basic load from S3 without loading from S3.""" + uri = "s3://bucket/file.nc" + expected_exc = "Unable to locate credentials" + with pytest.raises(NoCredsExc) as exc: + with load_from_s3(uri, storage_options={}) as nc: + data = nc["cow"][0] + assert expected_exc in str(exc.value) + + +@pytest.mark.skipif(USE_S3 == True, reason="it will look for silly URIs") +def test_get_endpoint_url(): + """Test _get_endpoint_url(self) from Active class.""" + storage_options = { + 'key': "cow", + 'secret': "secretcow", + 'client_kwargs': {'endpoint_url': "https://cow.moo"}, + } + uri = "tests/test_data/cesm2_native.nc" + ncvar = "TREFHT" + active = Active(uri, ncvar=ncvar, storage_options=storage_options) + ep_url = Active._get_endpoint_url(active) + assert ep_url == "https://cow.moo" diff --git a/tests/unit/test_reductionist.py b/tests/unit/test_reductionist.py index e509361c..417bad8b 100644 --- a/tests/unit/test_reductionist.py +++ b/tests/unit/test_reductionist.py @@ -49,7 +49,8 @@ def test_reduce_chunk_defaults(mock_request): session = reductionist.get_session(access_key, secret_key, cacert) assert session.auth == (access_key, secret_key) - assert session.verify + # FIXME this is hacky and comes from peasantly setting the cacert to False in reductionist.py + assert not session.verify tmp, count = reductionist.reduce_chunk(session, active_url, s3_url, bucket, object, offset, diff --git a/tests/unit/test_storage_types.py b/tests/unit/test_storage_types.py index 82d6761a..561c36ff 100644 --- a/tests/unit/test_storage_types.py +++ b/tests/unit/test_storage_types.py @@ -39,8 +39,8 @@ def test_s3(mock_reduce, mock_nz, mock_load, tmp_path): def load_from_s3(uri): yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - def load_netcdf_zarr_generic(uri, ncvar, storage_type): - return old_netcdf_to_zarr(test_file, ncvar, None) + def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): + return old_netcdf_to_zarr(test_file, ncvar, None, None) def reduce_chunk( session, @@ -92,7 +92,7 @@ def reduce_chunk( # S3 loading is not done from Active anymore mock_load.assert_not_called() - mock_nz.assert_called_once_with(uri, "data", "s3") + mock_nz.assert_called_once_with(uri, "data", "s3", None) # NOTE: This gets called multiple times with various arguments. Match on # the common ones. mock_reduce.assert_called_with( @@ -119,7 +119,7 @@ def test_reductionist_version_0(mock_load, tmp_path): """Test stack when call to Active contains storage_type == s3 using version 0.""" @contextlib.contextmanager - def load_from_s3(uri): + def load_from_s3(uri, storage_options=None): yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) mock_load.side_effect = load_from_s3 @@ -158,8 +158,8 @@ def test_reductionist_connection(mock_reduce, mock_nz, mock_load, tmp_path): def load_from_s3(uri): yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - def load_netcdf_zarr_generic(uri, ncvar, storage_type): - return old_netcdf_to_zarr(test_file, ncvar, None) + def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): + return old_netcdf_to_zarr(test_file, ncvar, None, None) mock_load.side_effect = load_from_s3 mock_nz.side_effect = load_netcdf_zarr_generic @@ -187,8 +187,8 @@ def test_reductionist_bad_request(mock_reduce, mock_nz, mock_load, tmp_path): def load_from_s3(uri): yield h5netcdf.File(test_file, 'r', invalid_netcdf=True) - def load_netcdf_zarr_generic(uri, ncvar, storage_type): - return old_netcdf_to_zarr(test_file, ncvar, None) + def load_netcdf_zarr_generic(uri, ncvar, storage_type, storage_options=None): + return old_netcdf_to_zarr(test_file, ncvar, None, None) mock_load.side_effect = load_from_s3 mock_nz.side_effect = load_netcdf_zarr_generic