Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Incremental rechunking #28

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

davidbrochart
Copy link
Collaborator

This is a limited implementation of incremental rechunking. There is still a lot to do, but I'd like to get early feedback on the approach.
Closes #8

@codecov
Copy link

codecov bot commented Jul 18, 2020

Codecov Report

Merging #28 into master will increase coverage by 0.75%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #28      +/-   ##
==========================================
+ Coverage   88.94%   89.70%   +0.75%     
==========================================
  Files           2        2              
  Lines         190      204      +14     
  Branches       44       50       +6     
==========================================
+ Hits          169      183      +14     
  Misses         11       11              
  Partials       10       10              
Impacted Files Coverage Δ
rechunker/api.py 93.52% <100.00%> (+0.72%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fdddf0f...4506e97. Read the comment docs.

@rabernat
Copy link
Member

Thanks a lot for this PR @davidbrochart! I really appreciate your contribution. I will try to give a thorough review in the next few days.

Copy link
Member

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this @davidbrochart. I'm have some questions about this implementation. I'd also want @TomAugspurger to have a look.

@@ -314,13 +374,21 @@ def _rechunk_array(
source_read = dsa.from_zarr(
source_array, chunks=read_chunks, storage_options=source_storage_options
)
if source_slice is not None:
source_read = source_read[source_slice]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be awesome to have access to Xarray's array adaptors here. Xarray has an internal implementation of lazily indexed arrays that would be preferable to slicing the dask array.

The downside of this approach is that, source_read is much bigger than source_read[source_slice], we end up carrying around a big, unnecessary dask graph. This will be culled on computation, but it still creates an unnecessary overhead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that, but that would make xarray a dependency of this project, right? And we would use xarray.open_zarr instead of dask.array.from_zarr?

Copy link
Collaborator

@shoyer shoyer Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, something like a "virtually sliced array" that effectively adds another slice onto supplied slices in __getitem__/__setitem__ would be really convenient here. This would let us reuse the same existing Executor interface for appending, just by passing in virtually sliced arrays for source and target.

I don't think we want to directly use xarray's array adapters (which come with lots of other baggage), but they are definitely good prior art. Rechunker's versions can be much simpler than xarray's LazilyOuterIndexedArray because it only needs to support slices of the form tuple(slice(int, int, None), ...) with start/stop bounds on the slices that never expand beyond the array itself.

Something like the following (untested!) might actually be enough, and should work OK with dask.array.from_array and dask.array.store:

class VirtuallySlicedArray:
    def __init__(self, target, key):
        if not all(
            k.start >= 0 and k.stop <= s and k.step is None
            for k, s in zip(key, target.shape)
        ):
            raise ValueError(f'invalid key {key} for shape {key}')
        self.target = target
        self.key = key

    @property
    def shape(self):
        return tuple(k.stop - k.start for k in self.key)

    @property
    def dtype(self):
        return self.target.dtype

    def _expand_key(self, key):
        return tuple(
            slice(k1.start + k2.start, k1.start + k2.stop)
            for k1, k2 in zip(self.key, key)
        )

    def __getitem__(self, key):
        return self.target[self._expand_key(key)]

    def __setitem__(self, key, value):
        self.target[self._expand_key(key)] = value


# create target
shape = tuple(int(x) for x in shape) # ensure python ints for serialization
target_chunks = tuple(int(x) for x in target_chunks)
int_chunks = tuple(int(x) for x in int_chunks)
write_chunks = tuple(int(x) for x in write_chunks)

if target_append:
target_store_or_group_original = target_store_or_group
target_store_or_group = "to_append_" + target_store_or_group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work on stores that are not paths. Note that the store can also be a mutable mapping (e.g. from s3fs). This is not covered by tests but should be!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what I meant by "limited implementation". I will work on that later.

return self._target

target = zarr.open(self._target_original)
target.append(self._target)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must admit, I don't understand why we are calling append here. Shouldn't the append operation happen inside the self.compute call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that when we append (target_append=True), we want to append new source data to an already existing zarr target_store. So we just rechunk the new source data to a temporary target store (which is 'to_append_' + target_store for now just to make it simple), that is created with the self.compute call, and we use zarr's append method to append it to the original target_store. Do you think this is too expensive, and that we should directly append to the original target store?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is best of we try to append directly to the original target store.

Also, we want to have all the computations inside the dask graph. As currently coded (append in execute), this part of the operation will not be run by dask, but rather will run from the client. That will be a huge bottleneck for large arrays.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we want this to happen inside the compute, so it should be added as a task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it should be added as a task.

But not just a single task. We need to parallelize the append operations just like we do all other writes, via dask.array.to_zarr.

The slice of the source to rechunk. The structure depends on ``source``.

- For a single array source, ``source_slice`` can be either a tuple (e.g.
``((0, 20), None, None)`` or a dictionary (e.g. ``{'time': (0, 20),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these continuation lines need to be aligned with the F in For.

Comment on lines +84 to +85
def prod(iterable):
return reduce(operator.mul, iterable, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use np.prod?

Comment on lines +80 to +82
import operator
from functools import reduce
import numpy as np
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports can go at the top.

return self._target

target = zarr.open(self._target_original)
target.append(self._target)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we want this to happen inside the compute, so it should be added as a task.

@rabernat
Copy link
Member

I would wait on further action on this until #30 is merged. That is a pretty significant refactor to the internal structure of the package.

@davidbrochart
Copy link
Collaborator Author

Yes, I agree.

@rabernat
Copy link
Member

@davidbrochart, now that #30 is done, we might want to revisit this.

Perhaps @shoyer has some ideas about how to best incorporate incremental rechunking / appending into the new code structure.

Again it seems like xarray's lazy indexing adaptors could come in very handy.

@davidbrochart
Copy link
Collaborator Author

@rabernat do you mean rechunker would depend on xarray, or pulling xarray's lazy indexing logic into rechunker's code?

@rsemlal-murmuration
Copy link

Was there any progress on this since then?

@rabernat
Copy link
Member

Hi @rsemlal-murmuration - turns out that incremental rechunking is pretty tricky (lots of edge cases)! There hasn't been any work on this recently in rechunker.

However, at Earthmover, we are exploring many different approaches to this problem currently.

@rsemlal-murmuration
Copy link

Understood! Thanks for the quick reply!

Looking into this as well at the moment.
The workaround we are considering: using rechunker to write the data slice into a new intermediate location, then appending it from there to the existing dataset using xarray.to_zarr(mode="a"). But it is obviously not the most efficient approach.

Would be interested if there are other approaches/workarounds out there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental rechunk
5 participants