Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for externally-supplied storage_options to S3 loader and further down to Reductionist #182

Merged
merged 30 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
86c6fab
start work on active.py
valeriupredoi Feb 12, 2024
faad584
finished with active.py
valeriupredoi Feb 12, 2024
4dfbd3c
finish with active.py also bugfixing
valeriupredoi Feb 12, 2024
7f3ed0f
correct mock object
valeriupredoi Feb 12, 2024
f1264f3
adapt netcdf to zarr
valeriupredoi Feb 13, 2024
853a42f
fix call to netcdf to zarr
valeriupredoi Feb 13, 2024
fc318f8
fix s3 unit test
valeriupredoi Feb 13, 2024
dd8442d
storage optinos parametrize test compression
valeriupredoi Feb 13, 2024
cdb1c04
blergh forgot about active_storage_url
valeriupredoi Feb 13, 2024
0c96f34
set cacert to False if None
valeriupredoi Feb 13, 2024
515dd0f
add special GA workflow for remote reductionist tests
valeriupredoi Feb 13, 2024
c8f3b9a
set cacert test assertion
valeriupredoi Feb 13, 2024
f6793e9
add CMIP6 file that also lives on Bryan's S3 machine
valeriupredoi Feb 13, 2024
779c7d4
add special test to test remote reductionist
valeriupredoi Feb 13, 2024
d51a98a
correct test remote reductionist
valeriupredoi Feb 13, 2024
91cdac4
actually name the blithering file correctly bc Bryan can't copy a fil…
valeriupredoi Feb 13, 2024
163d6ca
rename name of test in GA wkflow
valeriupredoi Feb 13, 2024
c5ea57a
always print an instance of request data dict from Reductionist
valeriupredoi Feb 13, 2024
8613533
handle special case of anon=True bucket
valeriupredoi Feb 13, 2024
cb6f1a5
force run s3 exploratory tests
valeriupredoi Feb 13, 2024
c158e1e
added some comments
valeriupredoi Feb 13, 2024
56547ce
check specifically for storage options None
valeriupredoi Feb 14, 2024
c37694e
check specifically for storage options None
valeriupredoi Feb 14, 2024
6f3868e
add to testing
valeriupredoi Feb 14, 2024
addec91
add failsafe test for anon=True buckets
valeriupredoi Feb 14, 2024
1ec8397
finally, catch and pass exception of Access Denied
valeriupredoi Feb 14, 2024
ccdf8c7
use remote Bryan Reductionist and Minio
valeriupredoi Feb 14, 2024
96480b2
bloody missing comma
valeriupredoi Feb 14, 2024
dd9ef28
ah minio ya bassard
valeriupredoi Feb 14, 2024
8812593
comment out test case
valeriupredoi Feb 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- name: Run S3 exploratory tests
run: |
pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html
if: always()
- name: Install pytest-monitor
run: pip install pytest-monitor
- name: Run S3 performance tests
Expand Down
68 changes: 68 additions & 0 deletions .github/workflows/test_s3_remote_reductionist.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# adapted GA workflow from https://github.com/stackhpc/reductionist-rs
# This runs Active with a remote Reductionist and S3 data stored elsewhere
---
name: S3/Remote Reductionist

on:
push:
branches:
- main # keep this at all times
pull_request:
schedule:
- cron: '0 0 * * *' # nightly

# Required shell entrypoint to have properly configured bash shell
defaults:
run:
shell: bash -l {0}

jobs:
linux-test:
runs-on: "ubuntu-latest"
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
fail-fast: false
name: Linux Python ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: conda-incubator/setup-miniconda@v2
with:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
miniforge-variant: Mambaforge
use-mamba: true
- name: Get conda and Python versions
run: |
conda --version
python -V
- name: Export proxy
run: |
echo 'USE_S3 = True' >> activestorage/config.py
- name: Ping remote Reductionist
run: curl -k https://192.171.169.248:8080/.well-known/reductionist-schema
- uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: activestorage-minio
environment-file: environment.yml
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
miniforge-variant: Mambaforge
use-mamba: true
- name: Install PyActiveStorage
run: |
conda --version
python -V
which python
pip install -e .
- name: Run one single test
run: |
pytest tests/test_compression_remote_reductionist.py
- name: Upload HTML report artifact
uses: actions/upload-artifact@v3
with:
name: html-report
path: test-reports/
if: always()
99 changes: 82 additions & 17 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


@contextlib.contextmanager
def load_from_s3(uri):
def load_from_s3(uri, storage_options=None):
"""
Load a netCDF4-like object from S3.

Expand All @@ -34,10 +34,15 @@
'<File-like object S3FileSystem, pyactivestorage/s3_test_bizarre.nc>'
instead, we use h5netcdf: https://github.com/h5netcdf/h5netcdf
a Python binder straight to HDF5-netCDF4 interface, that doesn't need a "local" file

storage_options: kwarg dict containing S3 credentials passed straight to Active
"""
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
if storage_options is None: # use pre-configured S3 credentials
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY, # eg "minioadmin" for Minio
secret=S3_SECRET_KEY, # eg "minioadmin" for Minio
client_kwargs={'endpoint_url': S3_URL}) # eg "http://localhost:9000" for Minio
else:
fs = s3fs.S3FileSystem(**storage_options) # use passed-in dictionary
with fs.open(uri, 'rb') as s3file:
ds = h5netcdf.File(s3file, 'r', invalid_netcdf=True)
print(f"Dataset loaded from S3 via h5netcdf: {ds}")
Expand Down Expand Up @@ -68,20 +73,43 @@
}
return instance

def __init__(self, uri, ncvar, storage_type=None, max_threads=100):
def __init__(
self,
uri,
ncvar,
storage_type=None,
max_threads=100,
storage_options=None,
active_storage_url=None
):
"""
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
file, however, if that information is available at instantiation, it can be provided
using keywords and avoid a metadata read.)

:param storage_options: s3fs.S3FileSystem options
:param active_storage_url: Reductionist server URL
"""
# Assume NetCDF4 for now
self.uri = uri
if self.uri is None:
raise ValueError(f"Must use a valid file for uri. Got {self.uri}")

# still allow for a passable storage_type
# for special cases eg "special-POSIX" ie DDN
if not storage_type and storage_options is not None:
storage_type = urllib.parse.urlparse(uri).scheme
self.storage_type = storage_type

# get storage_options
self.storage_options = storage_options
self.active_storage_url = active_storage_url

# basic check on file
if not os.path.isfile(self.uri) and not self.storage_type:
raise ValueError(f"Must use existing file for uri. {self.uri} not found")

self.ncvar = ncvar
if self.ncvar is None:
raise ValueError("Must set a netCDF variable name to slice")
Expand All @@ -107,13 +135,13 @@
lock = self.lock
if lock:
lock.acquire()

if self.storage_type is None:
nc = Dataset(self.uri)
data = nc[ncvar][index]
nc.close()
elif self.storage_type == "s3":
with load_from_s3(self.uri) as nc:
with load_from_s3(self.uri, self.storage_options) as nc:
data = nc[ncvar][index]
data = self._mask_data(data, nc[ncvar])

Expand Down Expand Up @@ -238,7 +266,8 @@
ds, zarray, zattrs = nz.load_netcdf_zarr_generic(
self.uri,
self.ncvar,
self.storage_type
self.storage_type,
self.storage_options,
)
# The following is a hangove from exploration
# and is needed if using the original doing it ourselves
Expand Down Expand Up @@ -390,6 +419,20 @@

return out

def _get_endpoint_url(self):
"""Return the endpoint_url of an S3 object store, or `None`"""
endpoint_url = self.storage_options.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url

Check warning on line 426 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L426

Added line #L426 was not covered by tests

client_kwargs = self.storage_options.get('client_kwargs')
if client_kwargs:
endpoint_url = client_kwargs.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url

return f"http://{urllib.parse.urlparse(self.filename).netloc}"

Check warning on line 434 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L434

Added line #L434 was not covered by tests

def _process_chunk(self, session, fsref, chunk_coords, chunk_selection, counts,
out_selection, compressor, filters, missing,
drop_axes=None):
Expand All @@ -406,22 +449,44 @@
key = f"{self.ncvar}/{coord}"
rfile, offset, size = tuple(fsref[key])

# S3: pass in pre-configured storage options (credentials)
if self.storage_type == "s3":
parsed_url = urllib.parse.urlparse(rfile)
bucket = parsed_url.netloc
object = parsed_url.path
# FIXME: We do not get the correct byte order on the Zarr Array's dtype
# when using S3, so use the value captured earlier.
dtype = self._dtype
tmp, count = reductionist.reduce_chunk(session, S3_ACTIVE_STORAGE_URL,
S3_URL,
bucket, object, offset,
size, compressor, filters,
missing, dtype,
self.zds._chunks,
self.zds._order,
chunk_selection,
operation=self._method)
if self.storage_options is None:
tmp, count = reductionist.reduce_chunk(session,
S3_ACTIVE_STORAGE_URL,
S3_URL,
bucket, object, offset,
size, compressor, filters,
missing, dtype,
self.zds._chunks,
self.zds._order,
chunk_selection,
operation=self._method)
else:
# special case for "anon=True" buckets that work only with e.g.
# fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})
# where file uri = bucketX/fileY.mc
print("S3 Storage options to Reductionist:", self.storage_options)
if self.storage_options.get("anon", None) == True:
bucket = os.path.dirname(parsed_url.path) # bucketX
object = os.path.basename(parsed_url.path) # fileY
print("S3 anon=True Bucket and File:", bucket, object)
tmp, count = reductionist.reduce_chunk(session,
self.active_storage_url,
self._get_endpoint_url(),
bucket, object, offset,
size, compressor, filters,
missing, dtype,
self.zds._chunks,
self.zds._order,
chunk_selection,
operation=self._method)
else:
# note there is an ongoing discussion about this interface, and what it returns
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
Expand Down
28 changes: 24 additions & 4 deletions activestorage/netcdf_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from kerchunk.hdf import SingleHdf5ToZarr


def gen_json(file_url, varname, outf, storage_type):
def gen_json(file_url, varname, outf, storage_type, storage_options):
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
if storage_type == "s3":
# S3 configuration presets
if storage_type == "s3" and storage_options is None:
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY,
secret=S3_SECRET_KEY,
client_kwargs={'endpoint_url': S3_URL},
Expand All @@ -26,6 +27,21 @@ def gen_json(file_url, varname, outf, storage_type):
with fs2.open(outf, 'wb') as f:
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())

# S3 passed-in configuration
elif storage_type == "s3" and storage_options is not None:
storage_options = storage_options.copy()
storage_options['default_fill_cache'] = False
storage_options['default_cache_type'] = "none"
fs = s3fs.S3FileSystem(**storage_options)
fs2 = fsspec.filesystem('')
with fs.open(file_url, 'rb') as s3file:
h5chunks = SingleHdf5ToZarr(s3file, file_url,
inline_threshold=0)
with fs2.open(outf, 'wb') as f:
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())
# not S3
else:
fs = fsspec.filesystem('')
with fs.open(file_url, 'rb') as local_file:
Expand Down Expand Up @@ -77,13 +93,17 @@ def open_zarr_group(out_json, varname):
return zarr_array


def load_netcdf_zarr_generic(fileloc, varname, storage_type, build_dummy=True):
def load_netcdf_zarr_generic(fileloc, varname, storage_type, storage_options, build_dummy=True):
"""Pass a netCDF4 file to be shaped as Zarr file by kerchunk."""
print(f"Storage type {storage_type}")

# Write the Zarr group JSON to a temporary file.
with tempfile.NamedTemporaryFile() as out_json:
_, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type)
_, zarray, zattrs = gen_json(fileloc,
varname,
out_json.name,
storage_type,
storage_options)

# open this monster
print(f"Attempting to open and convert {fileloc}.")
Expand Down
3 changes: 2 additions & 1 deletion activestorage/reductionist.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_session(username: str, password: str, cacert: typing.Optional[str]) -> r
"""
session = requests.Session()
session.auth = (username, password)
session.verify = cacert or True
session.verify = cacert or False
return session


Expand Down Expand Up @@ -53,6 +53,7 @@ def reduce_chunk(session, server, source, bucket, object,
"""

request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
print("Reductionist request data dictionary:", request_data)
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
response = request(session, url, request_data)
Expand Down
35 changes: 31 additions & 4 deletions tests/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ def create_compressed_dataset(tmp_path: str, compression: str, shuffle: bool):
return test_data


STORAGE_OPTIONS_CLASSIC = {
'key': S3_ACCESS_KEY,
'secret': S3_SECRET_KEY,
'client_kwargs': {'endpoint_url': S3_URL},
}
S3_ACTIVE_URL_MINIO = S3_ACTIVE_STORAGE_URL
S3_ACTIVE_URL_Bryan = "https://192.171.169.248:8080"

# TODO include all supported configuration types
# so far test three possible configurations for storage_options:
# - storage_options = None, active_storage_url = None (Minio and local Reductionist, preset credentials from config.py)
# - storage_options = CLASSIC, active_storage_url = CLASSIC (Minio and local Reductionist, preset credentials from config.py but folded in storage_options and active_storage_url)
# - storage_options = CLASSIC, active_storage_url = Bryan's machine (Minio BUT Reductionist moved on Bryan's machine)
# (this invariably fails due to data URL being //localhost:9000 closed to outside Reductionist
storage_options_paramlist = [
(None, None),
(STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_MINIO),
# (STORAGE_OPTIONS_CLASSIC, S3_ACTIVE_URL_Bryan)
]


@pytest.mark.parametrize('compression', ['zlib'])
@pytest.mark.parametrize('shuffle', [False, True])
def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool):
Expand All @@ -55,7 +76,8 @@ def test_compression_and_filters(tmp_path: str, compression: str, shuffle: bool)
assert result == 740.0


def test_compression_and_filters_cmip6_data():
@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist)
def test_compression_and_filters_cmip6_data(storage_options, active_storage_url):
"""
Test use of datasets with compression and filters applied for a real
CMIP6 dataset (CMIP6_IPSL-CM6A-LR_tas).
Expand All @@ -69,15 +91,18 @@ def test_compression_and_filters_cmip6_data():

check_dataset_filters(test_file, "tas", "zlib", False)

active = Active(test_file, 'tas', utils.get_storage_type())
active = Active(test_file, 'tas', utils.get_storage_type(),
storage_options=storage_options,
active_storage_url=active_storage_url)
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
assert nc_min == result
assert result == 239.25946044921875


def test_compression_and_filters_obs4mips_data():
@pytest.mark.parametrize("storage_options, active_storage_url", storage_options_paramlist)
def test_compression_and_filters_obs4mips_data(storage_options, active_storage_url):
"""
Test use of datasets with compression and filters applied for a real
obs4mips dataset (obs4MIPS_CERES-EBAF_L3B_Ed2-8_rlut.nc) at CMIP5 MIP standard
Expand All @@ -92,7 +117,9 @@ def test_compression_and_filters_obs4mips_data():

check_dataset_filters(test_file, "rlut", "zlib", False)

active = Active(test_file, 'rlut', utils.get_storage_type())
active = Active(test_file, 'rlut', utils.get_storage_type(),
storage_options=storage_options,
active_storage_url=active_storage_url)
active._version = 1
active._method = "min"
result = active[0:2,4:6,7:9]
Expand Down
Loading
Loading