Skip to content

Commit 549d89a

Browse files
committed
axis subsets
1 parent bd47be8 commit 549d89a

File tree

3 files changed

+124
-56
lines changed

3 files changed

+124
-56
lines changed

activestorage/active.py

+91-40
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,20 @@ def __new__(cls, *args, **kwargs):
111111
"""Store reduction methods."""
112112
instance = super().__new__(cls)
113113
instance._methods = {
114-
"min": np.min,
115-
"max": np.max,
116-
"sum": np.sum,
114+
"min": np.ma.min,
115+
"max": np.ma.max,
116+
"sum": np.ma.sum,
117117
# For the unweighted mean we calulate the sum and divide
118118
# by the number of non-missing elements
119-
"mean": np.sum,
119+
"mean": np.ma.sum,
120120
}
121121
return instance
122122

123123
def __init__(
124124
self,
125125
uri,
126126
ncvar,
127+
axis=None,
127128
storage_type=None,
128129
max_threads=100,
129130
storage_options=None,
@@ -161,6 +162,16 @@ def __init__(
161162
if self.ncvar is None:
162163
raise ValueError("Must set a netCDF variable name to slice")
163164

165+
# Parse axis (note, if axis is None then we'll work out how
166+
# many dimensions there are at the time of an active
167+
# __getitem__ call).
168+
if axis is not None:
169+
if isinstance(axis, int):
170+
axis = (axis,)
171+
else:
172+
axis = tuple(axis)
173+
174+
self._axis = axis
164175
self._version = 1
165176
self._components = False
166177
self._method = None
@@ -293,7 +304,7 @@ def _get_active(self, method, *args):
293304
an array returned via getitem.
294305
"""
295306
raise NotImplementedError
296-
307+
297308
@_metricise
298309
def _get_selection(self, *args):
299310
"""
@@ -311,6 +322,9 @@ def _get_selection(self, *args):
311322
array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks)
312323
ds = self.ds.id
313324

325+
if self._axis is None:
326+
self._axis = tuple(range(len(ds.shape)))
327+
314328
self.metric_data['args'] = args
315329
self.metric_data['dataset shape'] = self.ds.shape
316330
self.metric_data['dataset chunks'] = self.ds.chunks
@@ -324,20 +338,30 @@ def _get_selection(self, *args):
324338
#stripped_indexer = [(a, b, c) for a,b,c in indexer]
325339
drop_axes = indexer.drop_axes and keepdims
326340

327-
# we use array._chunks rather than ds.chunks, as the latter is none in the case of
328-
# unchunked data, and we need to tell the storage the array dimensions in this case.
329-
return self._from_storage(ds, indexer, array._chunks, out_shape, dtype, compressor, filters, drop_axes)
341+
# we use array._chunks rather than ds.chunks, as the latter is
342+
# none in the case of unchunked data, and we need to tell the
343+
# storage the array dimensions in this case.
344+
return self._from_storage(ds, indexer, array._chunks, out_shape, dtype, compressor, filters, drop_axes, self._axis)
330345

331-
def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, filters, drop_axes):
346+
def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, filters, drop_axes, axis):
332347
method = self.method
333-
348+
need_counts = self.components or self._method == "mean"
349+
334350
if method is not None:
335-
out = []
336-
counts = []
351+
# Replace the size of each reduced axis with the number of
352+
# chunks along that axis
353+
out_shape = list(out_shape)
354+
for i in axis:
355+
out_shape[i] = indexer.dim_indexers[i].nchunks
356+
357+
out = np.ma.empty(out_shape, dtype=out_dtype, order=ds._order)
358+
if need_counts:
359+
counts = np.ma.empty(
360+
out_shape, dtype=out_dtype, order=ds._order
361+
)
337362
else:
338363
out = np.empty(out_shape, dtype=out_dtype, order=ds._order)
339-
counts = None # should never get touched with no method!
340-
364+
341365
# Create a shared session object.
342366
if self.storage_type == "s3" and self._version==2:
343367
if self.storage_options is not None:
@@ -378,29 +402,32 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
378402
future = executor.submit(
379403
self._process_chunk,
380404
session, ds, chunks, chunk_coords, chunk_selection,
381-
counts, out_selection, compressor, filters, drop_axes=drop_axes)
405+
out_selection, compressor, filters, drop_axes=drop_axes)
382406
futures.append(future)
407+
383408
# Wait for completion.
384409
for future in concurrent.futures.as_completed(futures):
385410
try:
386-
result = future.result()
411+
result, count, out_selection = future.result()
387412
except Exception as exc:
388413
raise
389-
else:
390-
chunk_count +=1
391-
if method is not None:
392-
result, count = result
393-
out.append(result)
394-
counts.append(count)
395-
else:
396-
# store selected data in output
397-
result, selection = result
398-
out[selection] = result
414+
415+
chunk_count += 1
416+
417+
# Store the selected data
418+
out[out_selection] = result
419+
420+
# Store the counts for the selected data
421+
if need_counts:
422+
counts[out_selection] = count
399423

400424
if method is not None:
401425
# Apply the method (again) to aggregate the result
402-
out = method(out)
403-
shape1 = (1,) * len(out_shape)
426+
out = method(out, axis=axis, keepdims=True)
427+
428+
# Aggregate the counts
429+
if need_counts:
430+
n = np.ma.sum(counts, axis=axis, keepdims=True)
404431

405432
if self._components:
406433
# Return a dictionary of components containing the
@@ -415,9 +442,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
415442
# reductions require the per-dask-chunk partial
416443
# reductions to retain these dimensions so that
417444
# partial results can be concatenated correctly.)
418-
out = out.reshape(shape1)
419-
420-
n = np.sum(counts).reshape(shape1)
421445
if self._method == "mean":
422446
# For the average, the returned component is
423447
# "sum", not "mean"
@@ -431,7 +455,11 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
431455
# For the average, it is actually the sum that has
432456
# been created, so we need to divide by the sample
433457
# size.
434-
out = out / np.sum(counts).reshape(shape1)
458+
#
459+
# Note: It's OK if an element of 'n' is zero,
460+
# because it will necessarily correspond to
461+
# a masked value in 'out'.
462+
out = out / n
435463

436464
t2 = time.time()
437465
self.metric_data['reduction time (s)'] = t2-t1
@@ -453,24 +481,23 @@ def _get_endpoint_url(self):
453481

454482
return f"http://{urllib.parse.urlparse(self.filename).netloc}"
455483

456-
def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, counts,
484+
def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection,
457485
out_selection, compressor, filters, drop_axes=None):
458486
"""
459487
Obtain part or whole of a chunk.
460488
461489
This is done by taking binary data from storage and filling
462490
the output array.
463491
464-
Note the need to use counts for some methods
465-
#FIXME: Do, we, it's not actually used?
466-
467492
"""
468493

469494
# retrieve coordinates from chunk index
470495
storeinfo = ds.get_chunk_info_from_chunk_coord(chunk_coords)
471496
offset, size = storeinfo.byte_offset, storeinfo.size
472497
self.data_read += size
473498

499+
axis = self._axis
500+
474501
if self.storage_type == 's3' and self._version == 1:
475502

476503
tmp, count = reduce_opens3_chunk(ds._fh, offset, size, compressor, filters,
@@ -483,6 +510,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
483510
# S3: pass in pre-configured storage options (credentials)
484511
# print("S3 rfile is:", self.filename)
485512
parsed_url = urllib.parse.urlparse(self.filename)
513+
486514
bucket = parsed_url.netloc
487515
object = parsed_url.path
488516

@@ -504,6 +532,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
504532
chunks,
505533
ds._order,
506534
chunk_selection,
535+
axis,
507536
operation=self._method)
508537
else:
509538
# special case for "anon=True" buckets that work only with e.g.
@@ -523,6 +552,7 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
523552
chunks,
524553
ds._order,
525554
chunk_selection,
555+
axis,
526556
operation=self._method)
527557
elif self.storage_type=='ActivePosix' and self.version==2:
528558
# This is where the DDN Fuse and Infinia wrappers go
@@ -532,17 +562,38 @@ def _process_chunk(self, session, ds, chunks, chunk_coords, chunk_selection, cou
532562
# see https://github.com/valeriupredoi/PyActiveStorage/issues/33
533563
# so neither the returned data or the interface should be considered stable
534564
# although we will version changes.
565+
535566
tmp, count = reduce_chunk(self.filename, offset, size, compressor, filters,
536567
self.missing, ds.dtype,
537568
chunks, ds._order,
538-
chunk_selection, method=self.method)
539-
569+
chunk_selection, axis, method=self.method)
570+
540571
if self.method is not None:
541-
return tmp, count
572+
# Replace the index corresponding to each reduced axis
573+
# with its size-1 position in chunk-space.
574+
#
575+
# E.g. if 'out_selection' is (slice(0,12), slice(20,60)),
576+
# 'chunk_coord' is (1, 3), and 'axis' is (1,); then
577+
# 'out_selection' will become (slice(0,12),
578+
# slice(3,4)). If 'axis' were instead (0, 1) then
579+
# 'out_selection' would become (slice(1,2),
580+
# slice(3,4)).
581+
#
582+
# This makes sure that 'out_selection' puts 'tmp' in the
583+
# correct place of the numpy array defined by the method
584+
# that collates the 'tmp's for each chunk (currently
585+
# `_from_storage`).
586+
out_selection = list(out_selection)
587+
for i in axis:
588+
n = chunk_coords[i]
589+
out_selection[i] = slice(n, n+1)
590+
591+
return tmp, count, tuple(out_selection)
542592
else:
543593
if drop_axes:
544594
tmp = np.squeeze(tmp, axis=drop_axes)
545-
return tmp, out_selection
595+
596+
return tmp, None, out_selection
546597

547598
def _mask_data(self, data):
548599
"""

activestorage/reductionist.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def get_session(username: str, password: str, cacert: typing.Optional[str]) -> r
2727

2828
def reduce_chunk(session, server, source, bucket, object,
2929
offset, size, compression, filters, missing, dtype, shape,
30-
order, chunk_selection, operation):
30+
order, chunk_selection, axis, operation):
3131
"""Perform a reduction on a chunk using Reductionist.
3232
3333
:param server: Reductionist server URL
@@ -49,12 +49,13 @@ def reduce_chunk(session, server, source, bucket, object,
4949
1), slice(1, 3, 1), slice(0, 1, 1))
5050
this defines the part of the chunk which is to be
5151
obtained or operated upon.
52+
:param axis: tuple of the axes to reduce (non-negative integers)
5253
:param operation: name of operation to perform
5354
:returns: the reduced data as a numpy array or scalar
5455
:raises ReductionistError: if the request to Reductionist fails
5556
"""
5657

57-
request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection)
58+
request_data = build_request_data(source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, axis)
5859
if DEBUG:
5960
print(f"Reductionist request data dictionary: {request_data}")
6061
api_operation = "sum" if operation == "mean" else operation or "select"
@@ -134,7 +135,7 @@ def encode_missing(missing):
134135

135136
def build_request_data(source: str, bucket: str, object: str, offset: int,
136137
size: int, compression, filters, missing, dtype, shape,
137-
order, selection) -> dict:
138+
order, selection, axis) -> dict:
138139
"""Build request data for Reductionist API."""
139140
request_data = {
140141
'source': source,
@@ -145,6 +146,7 @@ def build_request_data(source: str, bucket: str, object: str, offset: int,
145146
'offset': int(offset),
146147
'size': int(size),
147148
'order': order,
149+
'axis': axis,
148150
}
149151
if shape:
150152
request_data["shape"] = shape
@@ -178,7 +180,8 @@ def decode_result(response):
178180
shape = json.loads(response.headers['x-activestorage-shape'])
179181
result = np.frombuffer(response.content, dtype=dtype)
180182
result = result.reshape(shape)
181-
count = json.loads(response.headers['x-activestorage-count'])
183+
count = json.loads(response.headers['x-activestorage-count']) # TODO this is wrong for now!
184+
count = np.frombuffer(response.content, dtype=dtype) # TODO this is wrong for now!
182185
return result, count
183186

184187

activestorage/storage.py

+26-12
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33

44
from numcodecs.compat import ensure_ndarray
55

6-
def reduce_chunk(rfile,
7-
offset, size, compression, filters, missing, dtype, shape,
8-
order, chunk_selection, method=None):
6+
def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, axis, method=None):
97
""" We do our own read of chunks and decoding etc
108
119
rfile - the actual file with the data
@@ -20,6 +18,7 @@ def reduce_chunk(rfile,
2018
(slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1))
2119
this defines the part of the chunk which is to be obtained
2220
or operated upon.
21+
axis - tuple of the axes to reduce (non-negative integers)
2322
method - computation desired
2423
(in this Python version it's an actual method, in
2524
storage implementations we'll change to controlled vocabulary)
@@ -41,18 +40,15 @@ def reduce_chunk(rfile,
4140
chunk = chunk.reshape(shape, order=order)
4241

4342
tmp = chunk[chunk_selection]
43+
tmp = mask_missing(tmp, missing)
44+
4445
if method:
45-
if missing != (None, None, None, None):
46-
tmp = remove_missing(tmp, missing)
47-
# Check on size of tmp; method(empty) fails or gives incorrect
48-
# results
49-
if tmp.size:
50-
return method(tmp), tmp.size
51-
else:
52-
return tmp, 0
46+
N = np.ma.count(tmp, axis=axis, keepdims=True)
47+
tmp = method(tmp, axis=axis, keepdims=True)
5348
else:
54-
return tmp, None
49+
N = None
5550

51+
return tmp, N
5652

5753
def filter_pipeline(chunk, compression, filters):
5854
"""
@@ -73,6 +69,24 @@ def filter_pipeline(chunk, compression, filters):
7369
return chunk
7470

7571

72+
def mask_missing(data, missing):
73+
"""
74+
As we are using numpy, we can use a masked array, storage implementations
75+
will have to do this by hand
76+
"""
77+
fill_value, missing_value, valid_min, valid_max = missing
78+
79+
if fill_value:
80+
data = np.ma.masked_equal(data, fill_value)
81+
if missing_value:
82+
data = np.ma.masked_equal(data, missing_value)
83+
if valid_max:
84+
data = np.ma.masked_greater(data, valid_max)
85+
if valid_min:
86+
data = np.ma.masked_less(data, valid_min)
87+
88+
return data
89+
7690
def remove_missing(data, missing):
7791
"""
7892
As we are using numpy, we can use a masked array, storage implementations

0 commit comments

Comments
 (0)