Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] ENH: Add context manager for zarr store cleanup #113

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions rechunker/api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""User-facing functions."""
from __future__ import annotations

import contextlib
import html
import textwrap
from collections import defaultdict
from typing import Union

import dask
import dask.array
import fsspec
import xarray
import zarr
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from xarray.backends.zarr import (
DIMENSION_KEY,
encode_zarr_attr_value,
Expand Down Expand Up @@ -579,3 +585,45 @@ def _setup_array_rechunk(
int_proxy = ArrayProxy(int_array, int_chunks)
write_proxy = ArrayProxy(target_array, write_chunks)
return CopySpec(read_proxy, int_proxy, write_proxy)


@contextlib.contextmanager
def rechunk_cm(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice if we could just call rechunk directly as a context manager and get the right behavior, rather than introducing a new function to the API.

Copy link
Author

@bzah bzah Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I renamed rechunk to _unsafe_rechunk and rechunk_cm to rechunk. Only the later is exposed in init

source,
target_chunks,
max_mem,
target_store: str,
target_options=None,
temp_store: str | None = None,
temp_options=None,
executor: str | CopySpecExecutor = "dask",
filesystem: str | AbstractFileSystem = LocalFileSystem(),
keep_target_store: bool = True,
) -> Rechunked:
try:
if isinstance(filesystem, str):
filesystem = fsspec.filesystem(filesystem)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should take the same options as whatever filesystem we are going to operate on (target_options or temp_options?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Martin if you can turn this into a suggested change, I will merge it in. We let @bzah's PR sit for a long time, so I'm not expecting him to be super responsive right now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused here why there are temp_options and target_options, but only one filesystem - that's stopping me from knowing which to use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. In general we support using different filesystems for temp vs. target. This new function assumes that they are both paths within the same filesystem.

This comment was marked as off-topic.

Copy link
Author

@bzah bzah Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are now 2 filesystems, one for each store.

if filesystem.exists(target_store):
raise FileExistsError(target_store)
_rm_store(temp_store, filesystem)
yield rechunk(
source=source,
target_chunks=target_chunks,
max_mem=max_mem,
target_store=target_store,
target_options=target_options,
temp_store=temp_store,
temp_options=temp_options,
executor=executor,
)
finally:
_rm_store(temp_store, filesystem)
if not keep_target_store:
_rm_store(target_store, filesystem)


def _rm_store(store: str, filesystem: AbstractFileSystem):
try:
filesystem.rm(store, recursive=True, maxdepth=100)
except FileNotFoundError:
pass
118 changes: 118 additions & 0 deletions tests/test_rechunk_cm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import dask
import dask.core
import numpy as np
import pytest
import xarray
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.memory import MemoryFileSystem

from rechunker import api
from unittest.mock import MagicMock, patch

TEST_DATASET = xarray.DataArray(
data=np.empty((10, 10)),
coords={"x": range(0, 10), "y": range(0, 10)},
dims=["x", "y"],
name="test_data",
).to_dataset()
LOCAL_FS = LocalFileSystem()
Mem = MemoryFileSystem()
TARGET_STORE_NAME = "target_store.zarr"
TMP_STORE_NAME = "tmp.zarr"


class Test_rechunk_cm:
def _clean(self, stores):
for s in stores:
try:
LOCAL_FS.rm(s, recursive=True, maxdepth=100)
except:
pass

@pytest.fixture(autouse=True)
def _wrap(self):
self._clean([TMP_STORE_NAME, TARGET_STORE_NAME])
with dask.config.set(scheduler="single-threaded"):
yield
self._clean([TMP_STORE_NAME, TARGET_STORE_NAME])

@patch("rechunker.api.rechunk")
def test_rechunk_cm__args_sent_as_is(self, rechunk_func: MagicMock):
with api.rechunk_cm(
source="source",
target_chunks={"truc": "bidule"},
max_mem="42KB",
target_store="target_store.zarr",
temp_store="tmp_store.zarr",
target_options=None,
temp_options=None,
executor="dask",
filesystem=LOCAL_FS,
keep_target_store=False,
):
rechunk_func.assert_called_with(
source="source",
target_chunks={"truc": "bidule"},
max_mem="42KB",
target_store="target_store.zarr",
target_options=None,
temp_store="tmp_store.zarr",
temp_options=None,
executor="dask",
)

def test_rechunk_cm__remove_every_stores(self):
with api.rechunk_cm(
source=TEST_DATASET,
target_chunks={"x": 2, "y": 2},
max_mem="42KB",
target_store="target_store.zarr",
temp_store="tmp_store.zarr",
target_options=None,
temp_options=None,
executor="dask",
filesystem=LOCAL_FS,
keep_target_store=False,
) as plan:
plan.execute()
assert LOCAL_FS.exists("target_store.zarr")
assert LOCAL_FS.exists("tmp_store.zarr")
assert not LOCAL_FS.exists("tmp_store.zarr")
assert not LOCAL_FS.exists("target_store.zarr")

def test_rechunk_cm__keep_target(self):
with api.rechunk_cm(
source=TEST_DATASET,
target_chunks={"x": 2, "y": 2},
max_mem="42KB",
target_store="target_store.zarr",
temp_store="tmp_store.zarr",
target_options=None,
temp_options=None,
executor="dask",
filesystem=LOCAL_FS,
keep_target_store=True,
) as plan:
plan.execute()
assert LOCAL_FS.exists("target_store.zarr")
assert LOCAL_FS.exists("tmp_store.zarr")
assert LOCAL_FS.exists("target_store.zarr")
assert not LOCAL_FS.exists("tmp_store.zarr")

def test_rechunk_cm__error_target_exist(self):
f = LOCAL_FS.open("target_store.zarr", "x")
f.close()
with pytest.raises(FileExistsError):
with api.rechunk_cm(
source=TEST_DATASET,
target_chunks={"x": 2, "y": 2},
max_mem="42KB",
target_store="target_store.zarr",
temp_store="tmp_store.zarr",
target_options=None,
temp_options=None,
executor="dask",
filesystem=LOCAL_FS,
keep_target_store=False,
):
pass