Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable https reduction (off NGINX server only) and auto-detect storage_type #245

Merged
merged 32 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
42c5cd4
start implementing https functionality
valeriupredoi Mar 10, 2025
3c3325f
add dedicated https test module
valeriupredoi Mar 10, 2025
b0c9684
working test with local reduction
valeriupredoi Mar 10, 2025
61bf171
first working https prototype
valeriupredoi Mar 10, 2025
3f9d619
openhttps file
valeriupredoi Mar 10, 2025
2fd00f0
test for actual https file
valeriupredoi Mar 10, 2025
c1d7336
add checker s3 func for uri
valeriupredoi Mar 11, 2025
00c47e0
auto-detect storage type
valeriupredoi Mar 11, 2025
33dae93
add two auto-storage type tests
valeriupredoi Mar 11, 2025
16fdb0a
account for s3 gubbins
valeriupredoi Mar 11, 2025
fde60eb
correct test
valeriupredoi Mar 11, 2025
332b599
add test case for https and reductionist
valeriupredoi Mar 11, 2025
4116abc
add test case for https and reductionist
valeriupredoi Mar 11, 2025
70dc84e
shorten line
valeriupredoi Mar 11, 2025
c5f4d62
add extra kwarg to request_data
valeriupredoi Mar 12, 2025
bd8b617
correct test in light of this
valeriupredoi Mar 12, 2025
2acfd47
run GHA with -n 2
valeriupredoi Mar 12, 2025
3d2d1fe
add to real s3 test case
valeriupredoi Mar 12, 2025
6c251a1
add note docstring
valeriupredoi Mar 12, 2025
2b4eef1
skip a couple longer tests and add test
valeriupredoi Mar 12, 2025
05baa1b
add corner case
valeriupredoi Mar 12, 2025
e259ace
test corner case
valeriupredoi Mar 12, 2025
9ef3e4d
add yet another corner case
valeriupredoi Mar 12, 2025
45cf1a7
tests for that
valeriupredoi Mar 12, 2025
6a450c7
Merge branch 'pyfive' into enable_http_reduction
valeriupredoi Mar 13, 2025
280f70d
fix missing kwarg
valeriupredoi Mar 13, 2025
fe057f7
Merge branch 'main' into enable_http_reduction
valeriupredoi Mar 14, 2025
aaf5e9e
cleanup wkfls and add pyfive installation for conda lock install flow
valeriupredoi Mar 14, 2025
f43a430
unrun GHA
valeriupredoi Mar 14, 2025
b0285b4
Merge branch 'main' into enable_http_reduction
valeriupredoi Mar 14, 2025
7cd1567
GHA run all pythons for s3 only at merge
valeriupredoi Mar 14, 2025
7891742
GHA s3 test one Python only
valeriupredoi Mar 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/install-from-condalock-file.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ jobs:
- run: which python
- run: python -V
- run: conda create --name activestorage-fromlock --file conda-linux-64.lock
- name: Install development version of NCAS-CMS/Pyfive:wacasoft
run: |
cd ..
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout wacasoft
pip install -e .
- run: which python
- run: pip --version
- run: pip install -e .
Expand Down
85 changes: 85 additions & 0 deletions .github/workflows/run-s3-test-push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# adapted GA workflow from https://github.com/stackhpc/reductionist-rs
---
name: S3/Minio Test Latest Python

on:
push:

# Required shell entrypoint to have properly configured bash shell
defaults:
run:
shell: bash -l {0}

jobs:
linux-test:
runs-on: "ubuntu-latest"
strategy:
matrix:
python-version: ["3.13"]
fail-fast: false
name: Linux Python ${{ matrix.python-version }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: conda-incubator/setup-miniconda@v3
with:
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
mamba-version: "2.0.5" # https://github.com/conda-incubator/setup-miniconda/issues/392
- name: Get conda and Python versions
run: |
conda --version
python -V
- name: Export proxy
run: |
echo 'USE_S3 = True' >> activestorage/config.py
- name: Start minio object storage
run: tests/s3_exploratory/minio_scripts/minio-start
- name: Wait for minio object storage to start
run: |
until curl -if http://localhost:9001; do
sleep 1;
done
- name: Run Reductionist container
run: docker run -it --detach --rm --net=host --name reductionist ghcr.io/stackhpc/reductionist-rs:latest
- uses: conda-incubator/setup-miniconda@v3
with:
activate-environment: activestorage-minio
environment-file: environment.yml
python-version: ${{ matrix.python-version }}
miniforge-version: "latest"
use-mamba: true
mamba-version: "2.0.5" # https://github.com/conda-incubator/setup-miniconda/issues/392
- name: Install development version of NCAS-CMS/Pyfive:wacasoft
run: |
cd ..
git clone https://github.com/NCAS-CMS/pyfive.git
cd pyfive
git checkout wacasoft
pip install -e .
- name: Install PyActiveStorage
run: |
conda --version
python -V
which python
pip install -e .
- name: Run tests
run: |
pytest -n 2
- name: Run S3 exploratory tests
run: |
pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html
if: always()
- name: Install pytest-monitor
run: pip install pytest-monitor
- name: Run S3 performance tests
run: |
pytest tests/s3_exploratory/test_s3_arrange_files.py
pytest tests/s3_exploratory/test_s3_performance.py --db ../.pymon
- name: Analyze S3 and local test performance
run: python tests/s3_exploratory/parse_pymon.py
- name: Stop minio object storage
run: tests/s3_exploratory/minio_scripts/minio-stop
if: always()
3 changes: 1 addition & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- main
- pyfive
schedule:
- cron: '0 0 * * *' # nightly

Expand Down Expand Up @@ -78,4 +77,4 @@ jobs:
- run: conda list
- run: mamba install -c conda-forge git
- run: pip install -e .
- run: pytest
- run: pytest -n 2
6 changes: 2 additions & 4 deletions .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ name: S3/Minio Test
on:
push:
branches:
- main # keep this at all times
- pyfive
pull_request:
- main
schedule:
- cron: '0 0 * * *' # nightly

Expand Down Expand Up @@ -73,7 +71,7 @@ jobs:
pip install -e .
- name: Run tests
run: |
pytest
pytest -n 2
- name: Run S3 exploratory tests
run: |
pytest tests/s3_exploratory/test_s3_reduction.py --html=test-reports/s3-exploratory-report.html
Expand Down
91 changes: 90 additions & 1 deletion activestorage/active.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import concurrent.futures
import os
import fsspec
import numpy as np
import pathlib
import urllib
import pyfive
import requests
import s3fs
import time

Expand All @@ -17,6 +19,33 @@
from activestorage.hdf2numcodec import decode_filters


def return_storage_type(uri):
"""
Extract the gateway-protocol to infer what type of storage
"""
try:
resp = requests.head(uri)
except requests.exceptions.MissingSchema: # eg local file
return
except requests.exceptions.InvalidSchema: # eg Minio file s3://pyactivestorage/common_cl_a.nc
if not uri.startswith("s3:"):
return
else:
return "s3"
except requests.exceptions.ConnectionError as exc: # eg invalid link or offline
print(exc)
return
response = resp.headers

# https files on NGINX don't have "gateway-protocol" key
if "gateway-protocol" in response:
if response["gateway-protocol"] == "s3":
print("Gateway protocol indicates S3 storage.")
return "s3"
else:
return "https"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnlawrence this is the bit I was on about on the call

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great. I think we'd need to put the response to that in the json going to reductionist, rather than in the URI itself.



def load_from_s3(uri, storage_options=None):
"""
Load a netCDF4-like object from S3.
Expand Down Expand Up @@ -50,6 +79,19 @@ def load_from_s3(uri, storage_options=None):
return ds


def load_from_https(uri):
"""
Load a pyfive.high_level.Dataset from a
netCDF4 file on an https server (NGINX).
"""
#TODO need to test if NGINX server behind https://
fs = fsspec.filesystem('http')
http_file = fs.open(uri, 'rb')
ds = pyfive.File(http_file)
print(f"Dataset loaded from https with Pyfive: {uri}")
return ds


def get_missing_attributes(ds):
""""
Load all the missing attributes we need from a netcdf file
Expand Down Expand Up @@ -140,6 +182,24 @@ def __init__(
self.ds = dataset
self.uri = dataset

# determine the storage_type
# based on what we have available
if not storage_type:
if not input_variable:
check_uri = self.uri
else:
check_uri = self.ds.id._filename

# "special" case when we have to deal
# with storage_options['client_kwargs']["endpoint_url"]
if storage_options is not None and 'client_kwargs' in storage_options:
if "endpoint_url" in storage_options['client_kwargs']:
base_url = storage_options['client_kwargs']["endpoint_url"]
if not input_variable:
check_uri = os.path.join(base_url, self.uri)
else:
check_uri = os.path.join(base_url, self.ds.id._filename)
storage_type = return_storage_type(check_uri)

# still allow for a passable storage_type
# for special cases eg "special-POSIX" ie DDN
Expand All @@ -152,6 +212,8 @@ def __init__(
self.filename = self.ds
elif input_variable and self.storage_type == "s3":
self.filename = self.ds.id._filename
elif input_variable and self.storage_type == "https":
self.filename = self.ds

# get storage_options
self.storage_options = storage_options
Expand Down Expand Up @@ -198,6 +260,8 @@ def __load_nc_file(self):
nc = pyfive.File(self.uri)
elif self.storage_type == "s3":
nc = load_from_s3(self.uri, self.storage_options)
elif self.storage_type == "https":
nc = load_from_https(self.uri)
self.filename = self.uri
self.ds = nc[ncvar]

Expand Down Expand Up @@ -518,7 +582,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection,
method=self.method
)

elif self.storage_type == "s3" and self._version==2:
elif self.storage_type == "s3" and self._version == 2:
# S3: pass in pre-configured storage options (credentials)
# print("S3 rfile is:", self.filename)
parsed_url = urllib.parse.urlparse(self.filename)
Expand Down Expand Up @@ -567,6 +631,31 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection,
chunk_selection,
axis,
operation=self._method)
# this is for testing ONLY until Reductionist is able to handle https
# located files; after that, we can pipe any regular https file through
# to Reductionist, provided the https server is "closer" to Reductionist
elif self.storage_type == "https" and self._version == 2:
# build a simple session
session = requests.Session()
session.auth = (None, None)
session.verify = False
bucket = "https" # really doesn't matter

# note the extra "storage_type" kwarg
# this currently makes Reductionist throw a wobbly
# E activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["Failed to deserialize the JSON body into the target type", "storage_type: unknown field `storage_type`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing` at line 1 column 550"]}}
tmp, count = reductionist.reduce_chunk(session,
"https://192.171.169.113:8080",
self.filename,
bucket, self.filename, offset,
size, compressor, filters,
self.missing, np.dtype(ds.dtype),
chunks,
ds._order,
chunk_selection,
axis,
operation=self._method,
storage_type="https")
elif self.storage_type=='ActivePosix' and self.version==2:
# This is where the DDN Fuse and Infinia wrappers go
raise NotImplementedError
Expand Down
10 changes: 7 additions & 3 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_session(username: str, password: str, cacert: typing.Optional[str]) -> r

def reduce_chunk(session, server, source, bucket, object,
offset, size, compression, filters, missing, dtype, shape,
order, chunk_selection, axis, operation):
order, chunk_selection, axis, operation, storage_type=None):
"""Perform a reduction on a chunk using Reductionist.
:param server: Reductionist server URL
Expand All @@ -53,11 +53,14 @@ def reduce_chunk(session, server, source, bucket, object,
obtained or operated upon.
:param axis: tuple of the axes to be reduced (non-negative integers)
:param operation: name of operation to perform
:param storage_type: optional testing flag to allow HTTPS reduction
:returns: the reduced data as a numpy array or scalar
:raises ReductionistError: if the request to Reductionist fails
"""

request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, axis)
request_data = build_request_data(source, bucket, object, offset, size, compression,
filters, missing, dtype, shape, order, chunk_selection,
axis, storage_type=storage_type)
if DEBUG:
print(f"Reductionist request data dictionary: {request_data}")
api_operation = "sum" if operation == "mean" else operation or "select"
Expand Down Expand Up @@ -137,7 +140,7 @@ def encode_missing(missing):

def build_request_data(source: str, bucket: str, object: str, offset: int,
size: int, compression, filters, missing, dtype, shape,
order, selection, axis) -> dict:
order, selection, axis, storage_type=None) -> dict:
"""Build request data for Reductionist API."""
request_data = {
'source': source,
Expand All @@ -148,6 +151,7 @@ def build_request_data(source: str, bucket: str, object: str, offset: int,
'offset': int(offset),
'size': int(size),
'order': order,
'storage_type': storage_type,
}
if shape:
request_data["shape"] = shape
Expand Down
40 changes: 28 additions & 12 deletions activestorage/storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Active storage module."""
import fsspec
import numpy as np
import pyfive

Expand Down Expand Up @@ -35,18 +36,33 @@ def reduce_chunk(rfile,
#FIXME: for the moment, open the file every time ... we might want to do that, or not
# we could just use an instance of pyfive.high_level.Dataset.id
# passed directly from active.py, as below
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)
try:
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)
except FileNotFoundError: # could a https file
fs = fsspec.filesystem('http')
with fs.open(rfile, 'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)
else:
class storeinfo: pass
storeinfo.byte_offset = offset
Expand Down
Loading
Loading