Skip to content

Commit 8517401

Browse files
Merge pull request #182 from valeriupredoi/storage_options_local_branch
Allow for externally-supplied `storage_options` to S3 loader and further down to Reductionist
2 parents bb769e5 + 8812593 commit 8517401

11 files changed

+355
-36
lines changed

.github/workflows/test_s3_minio.yml

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ jobs:
6969
- name: Run S3 exploratory tests
7070
run: |
7171
pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html
72+
if: always()
7273
- name: Install pytest-monitor
7374
run: pip install pytest-monitor
7475
- name: Run S3 performance tests
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# adapted GA workflow from https://github.com/stackhpc/reductionist-rs
2+
# This runs Active with a remote Reductionist and S3 data stored elsewhere
3+
---
4+
name: S3/Remote Reductionist
5+
6+
on:
7+
push:
8+
branches:
9+
- main # keep this at all times
10+
pull_request:
11+
schedule:
12+
- cron: '0 0 * * *' # nightly
13+
14+
# Required shell entrypoint to have properly configured bash shell
15+
defaults:
16+
run:
17+
shell: bash -l {0}
18+
19+
jobs:
20+
linux-test:
21+
runs-on: "ubuntu-latest"
22+
strategy:
23+
matrix:
24+
python-version: ["3.9", "3.10", "3.11", "3.12"]
25+
fail-fast: false
26+
name: Linux Python ${{ matrix.python-version }}
27+
steps:
28+
- uses: actions/checkout@v3
29+
with:
30+
fetch-depth: 0
31+
- uses: conda-incubator/setup-miniconda@v2
32+
with:
33+
python-version: ${{ matrix.python-version }}
34+
miniforge-version: "latest"
35+
miniforge-variant: Mambaforge
36+
use-mamba: true
37+
- name: Get conda and Python versions
38+
run: |
39+
conda --version
40+
python -V
41+
- name: Export proxy
42+
run: |
43+
echo 'USE_S3 = True' >> activestorage/config.py
44+
- name: Ping remote Reductionist
45+
run: curl -k https://192.171.169.248:8080/.well-known/reductionist-schema
46+
- uses: conda-incubator/setup-miniconda@v2
47+
with:
48+
activate-environment: activestorage-minio
49+
environment-file: environment.yml
50+
python-version: ${{ matrix.python-version }}
51+
miniforge-version: "latest"
52+
miniforge-variant: Mambaforge
53+
use-mamba: true
54+
- name: Install PyActiveStorage
55+
run: |
56+
conda --version
57+
python -V
58+
which python
59+
pip install -e .
60+
- name: Run one single test
61+
run: |
62+
pytest tests/test_compression_remote_reductionist.py
63+
- name: Upload HTML report artifact
64+
uses: actions/upload-artifact@v3
65+
with:
66+
name: html-report
67+
path: test-reports/
68+
if: always()

activestorage/active.py

+82-17
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
@contextlib.contextmanager
23-
def load_from_s3(uri):
23+
def load_from_s3(uri, storage_options=None):
2424
"""
2525
Load a netCDF4-like object from S3.
2626
@@ -34,10 +34,15 @@ def load_from_s3(uri):
3434
'<File-like object S3FileSystem, pyactivestorage/s3_test_bizarre.nc>'
3535
instead, we use h5netcdf: https://github.com/h5netcdf/h5netcdf
3636
a Python binder straight to HDF5-netCDF4 interface, that doesn't need a "local" file
37+
38+
storage_options: kwarg dict containing S3 credentials passed straight to Active
3739
"""
38-
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
39-
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
40-
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
40+
if storage_options is None: # use pre-configured S3 credentials
41+
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
42+
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
43+
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
44+
else:
45+
fs = s3fs.S3FileSystem(**storage_options) # use passed-in dictionary
4146
with fs.open(uri, 'rb') as s3file:
4247
ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True)
4348
print(f"Dataset loaded from S3 via h5netcdf: {ds}")
@@ -68,20 +73,43 @@ def __new__(cls, *args, **kwargs):
6873
}
6974
return instance
7075

71-
def __init__(self, uri, ncvar, storage_type=None, max_threads=100):
76+
def __init__(
77+
self,
78+
uri,
79+
ncvar,
80+
storage_type=None,
81+
max_threads=100,
82+
storage_options=None,
83+
active_storage_url=None
84+
):
7285
"""
7386
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
7487
(We need the variable, because we need variable specific metadata from within that
7588
file, however, if that information is available at instantiation, it can be provided
7689
using keywords and avoid a metadata read.)
90+
91+
:param storage_options: s3fs.S3FileSystem options
92+
:param active_storage_url: Reductionist server URL
7793
"""
7894
# Assume NetCDF4 for now
7995
self.uri = uri
8096
if self.uri is None:
8197
raise ValueError(f"Must use a valid file for uri. Got {self.uri}")
98+
99+
# still allow for a passable storage_type
100+
# for special cases eg "special-POSIX" ie DDN
101+
if not storage_type and storage_options is not None:
102+
storage_type = urllib.parse.urlparse(uri).scheme
82103
self.storage_type = storage_type
104+
105+
# get storage_options
106+
self.storage_options = storage_options
107+
self.active_storage_url = active_storage_url
108+
109+
# basic check on file
83110
if not os.path.isfile(self.uri) and not self.storage_type:
84111
raise ValueError(f"Must use existing file for uri. {self.uri} not found")
112+
85113
self.ncvar = ncvar
86114
if self.ncvar is None:
87115
raise ValueError("Must set a netCDF variable name to slice")
@@ -107,13 +135,13 @@ def __getitem__(self, index):
107135
lock = self.lock
108136
if lock:
109137
lock.acquire()
110-
138+
111139
if self.storage_type is None:
112140
nc = Dataset(self.uri)
113141
data = nc[ncvar][index]
114142
nc.close()
115143
elif self.storage_type == "s3":
116-
with load_from_s3(self.uri) as nc:
144+
with load_from_s3(self.uri, self.storage_options) as nc:
117145
data = nc[ncvar][index]
118146
data = self._mask_data(data, nc[ncvar])
119147

@@ -238,7 +266,8 @@ def _via_kerchunk(self, index):
238266
ds, zarray, zattrs = nz.load_netcdf_zarr_generic(
239267
self.uri,
240268
self.ncvar,
241-
self.storage_type
269+
self.storage_type,
270+
self.storage_options,
242271
)
243272
# The following is a hangove from exploration
244273
# and is needed if using the original doing it ourselves
@@ -390,6 +419,20 @@ def _from_storage(self, stripped_indexer, drop_axes, out_shape, out_dtype,
390419

391420
return out
392421

422+
def _get_endpoint_url(self):
423+
"""Return the endpoint_url of an S3 object store, or `None`"""
424+
endpoint_url = self.storage_options.get('endpoint_url')
425+
if endpoint_url is not None:
426+
return endpoint_url
427+
428+
client_kwargs = self.storage_options.get('client_kwargs')
429+
if client_kwargs:
430+
endpoint_url = client_kwargs.get('endpoint_url')
431+
if endpoint_url is not None:
432+
return endpoint_url
433+
434+
return f"http://{urllib.parse.urlparse(self.filename).netloc}"
435+
393436
def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts,
394437
out_selection, compressor, filters, missing,
395438
drop_axes=None):
@@ -406,22 +449,44 @@ def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts,
406449
key = f"{self.ncvar}/{coord}"
407450
rfile, offset, size = tuple(fsref[key])
408451

452+
# S3: pass in pre-configured storage options (credentials)
409453
if self.storage_type == "s3":
410454
parsed_url = urllib.parse.urlparse(rfile)
411455
bucket = parsed_url.netloc
412456
object = parsed_url.path
413457
# FIXME: We do not get the correct byte order on the Zarr Array's dtype
414458
# when using S3, so use the value captured earlier.
415459
dtype = self._dtype
416-
tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL,
417-
S3_URL,
418-
bucket, object, offset,
419-
size, compressor, filters,
420-
missing, dtype,
421-
self.zds._chunks,
422-
self.zds._order,
423-
chunk_selection,
424-
operation=self._method)
460+
if self.storage_options is None:
461+
tmp, count = reductionist.reduce_chunk(session,
462+
S3_ACTIVE_STORAGE_URL,
463+
S3_URL,
464+
bucket, object, offset,
465+
size, compressor, filters,
466+
missing, dtype,
467+
self.zds._chunks,
468+
self.zds._order,
469+
chunk_selection,
470+
operation=self._method)
471+
else:
472+
# special case for "anon=True" buckets that work only with e.g.
473+
# fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})
474+
# where file uri = bucketX/fileY.mc
475+
print("S3 Storage options to Reductionist:", self.storage_options)
476+
if self.storage_options.get("anon", None) == True:
477+
bucket = os.path.dirname(parsed_url.path) # bucketX
478+
object = os.path.basename(parsed_url.path) # fileY
479+
print("S3 anon=True Bucket and File:", bucket, object)
480+
tmp, count = reductionist.reduce_chunk(session,
481+
self.active_storage_url,
482+
self._get_endpoint_url(),
483+
bucket, object, offset,
484+
size, compressor, filters,
485+
missing, dtype,
486+
self.zds._chunks,
487+
self.zds._order,
488+
chunk_selection,
489+
operation=self._method)
425490
else:
426491
# note there is an ongoing discussion about this interface, and what it returns
427492
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33

activestorage/netcdf_to_zarr.py

+24-4
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
from kerchunk.hdf import SingleHdf5ToZarr
1111

1212

13-
def gen_json(file_url, varname, outf, storage_type):
13+
def gen_json(file_url, varname, outf, storage_type, storage_options):
1414
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
15-
if storage_type == "s3":
15+
# S3 configuration presets
16+
if storage_type == "s3" and storage_options is None:
1617
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY,
1718
secret=S3_SECRET_KEY,
1819
client_kwargs={'endpoint_url': S3_URL},
@@ -26,6 +27,21 @@ def gen_json(file_url, varname, outf, storage_type):
2627
with fs2.open(outf, 'wb') as f:
2728
content = h5chunks.translate()
2829
f.write(ujson.dumps(content).encode())
30+
31+
# S3 passed-in configuration
32+
elif storage_type == "s3" and storage_options is not None:
33+
storage_options = storage_options.copy()
34+
storage_options['default_fill_cache'] = False
35+
storage_options['default_cache_type'] = "none"
36+
fs = s3fs.S3FileSystem(**storage_options)
37+
fs2 = fsspec.filesystem('')
38+
with fs.open(file_url, 'rb') as s3file:
39+
h5chunks = SingleHdf5ToZarr(s3file, file_url,
40+
inline_threshold=0)
41+
with fs2.open(outf, 'wb') as f:
42+
content = h5chunks.translate()
43+
f.write(ujson.dumps(content).encode())
44+
# not S3
2945
else:
3046
fs = fsspec.filesystem('')
3147
with fs.open(file_url, 'rb') as local_file:
@@ -77,13 +93,17 @@ def open_zarr_group(out_json, varname):
7793
return zarr_array
7894

7995

80-
def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True):
96+
def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, build_dummy=True):
8197
"""Pass a netCDF4 file to be shaped as Zarr file by kerchunk."""
8298
print(f"Storage type {storage_type}")
8399

84100
# Write the Zarr group JSON to a temporary file.
85101
with tempfile.NamedTemporaryFile() as out_json:
86-
_, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type)
102+
_, zarray, zattrs = gen_json(fileloc,
103+
varname,
104+
out_json.name,
105+
storage_type,
106+
storage_options)
87107

88108
# open this monster
89109
print(f"Attempting to open and convert {fileloc}.")

activestorage/reductionist.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def get_session(username: str, password: str, cacert: typing.Optional[str]) -> r
1919
"""
2020
session = requests.Session()
2121
session.auth = (username, password)
22-
session.verify = cacert or True
22+
session.verify = cacert or False
2323
return session
2424

2525

@@ -53,6 +53,7 @@ def reduce_chunk(session, server, source, bucket, object,
5353
"""
5454

5555
request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
56+
print("Reductionist request data dictionary:", request_data)
5657
api_operation = "sum" if operation == "mean" else operation or "select"
5758
url = f'{server}/v1/{api_operation}/'
5859
response = request(session, url, request_data)

tests/test_compression.py

+31-4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,27 @@ def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
4040
return test_data
4141

4242

43+
STORAGE_OPTIONS_CLASSIC = {
44+
'key': S3_ACCESS_KEY,
45+
'secret': S3_SECRET_KEY,
46+
'client_kwargs': {'endpoint_url': S3_URL},
47+
}
48+
S3_ACTIVE_URL_MINIO = S3_ACTIVE_STORAGE_URL
49+
S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080"
50+
51+
# TODO include all supported configuration types
52+
# so far test three possible configurations for storage_options:
53+
# - storage_options = None, active_storage_url = None (Minio and local Reductionist, preset credentials from config.py)
54+
# - storage_options = CLASSIC, active_storage_url = CLASSIC (Minio and local Reductionist, preset credentials from config.py but folded in storage_options and active_storage_url)
55+
# - storage_options = CLASSIC, active_storage_url = Bryan's machine (Minio BUT Reductionist moved on Bryan's machine)
56+
# (this invariably fails due to data URL being //localhost:9000 closed to outside Reductionist
57+
storage_options_paramlist = [
58+
(None, None),
59+
(STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_MINIO),
60+
# (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_Bryan)
61+
]
62+
63+
4364
@pytest.mark.parametrize('compression', ['zlib'])
4465
@pytest.mark.parametrize('shuffle', [False, True])
4566
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
@@ -55,7 +76,8 @@ def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool)
5576
assert result == 740.0
5677

5778

58-
def test_compression_and_filters_cmip6_data():
79+
@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist)
80+
def test_compression_and_filters_cmip6_data(storage_options, active_storage_url):
5981
"""
6082
Test use of datasets with compression and filters applied for a real
6183
CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas).
@@ -69,15 +91,18 @@ def test_compression_and_filters_cmip6_data():
6991

7092
check_dataset_filters(test_file, "tas", "zlib", False)
7193

72-
active = Active(test_file, 'tas', utils.get_storage_type())
94+
active = Active(test_file, 'tas', utils.get_storage_type(),
95+
storage_options=storage_options,
96+
active_storage_url=active_storage_url)
7397
active._version = 1
7498
active._method = "min"
7599
result = active[0:2,4:6,7:9]
76100
assert nc_min == result
77101
assert result == 239.25946044921875
78102

79103

80-
def test_compression_and_filters_obs4mips_data():
104+
@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist)
105+
def test_compression_and_filters_obs4mips_data(storage_options, active_storage_url):
81106
"""
82107
Test use of datasets with compression and filters applied for a real
83108
obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard
@@ -92,7 +117,9 @@ def test_compression_and_filters_obs4mips_data():
92117

93118
check_dataset_filters(test_file, "rlut", "zlib", False)
94119

95-
active = Active(test_file, 'rlut', utils.get_storage_type())
120+
active = Active(test_file, 'rlut', utils.get_storage_type(),
121+
storage_options=storage_options,
122+
active_storage_url=active_storage_url)
96123
active._version = 1
97124
active._method = "min"
98125
result = active[0:2,4:6,7:9]

0 commit comments

Comments
 (0)