Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream directly to packed during verdi archive import #6417

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/aiida/cmdline/commands/cmd_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ class ExtrasImportCode(Enum):
is_flag=True,
help='Determine entities to import, but do not actually import them. Deprecated, please use `--dry-run` instead.',
)
@click.option(
'--packed',
is_flag=True,
default=False,
help='Stream repository files directly to `packed`, rather than `loose`. Might provide speedup for large archives.',
)
@options.DRY_RUN(help='Determine entities to import, but do not actually import them.')
@decorators.with_dbenv()
@click.pass_context
Expand All @@ -375,6 +381,7 @@ def import_archive(
group,
test_run,
dry_run,
packed,
):
"""Import archived data to a profile.

Expand Down Expand Up @@ -407,6 +414,7 @@ def import_archive(
'create_group': import_group,
'group': group,
'test_run': dry_run,
'packed': packed,
}

for archive, web_based in all_archives:
Expand Down Expand Up @@ -466,7 +474,7 @@ def _gather_imports(archives, webpages) -> List[Tuple[str, bool]]:


def _import_archive_and_migrate(
ctx: click.Context, archive: str, web_based: bool, import_kwargs: dict, try_migration: bool
ctx: click.Context, archive: str, web_based: bool, import_kwargs: dict, try_migration: bool, packed: bool = False
):
"""Perform the archive import.

Expand Down
41 changes: 41 additions & 0 deletions src/aiida/cmdline/commands/cmd_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,44 @@ def profile_delete(force, delete_data, profiles):

get_config().delete_profile(profile.name, delete_storage=delete_data)
echo.echo_success(f'Profile `{profile.name}` was deleted.')


@verdi_profile.command('flush')
@arguments.PROFILE(required=True)
@options.FORCE(help='Skip any prompts for confirmation.')
def profile_flush(force, profile):
"""Delete data of one or more profiles.

The PROFILES argument takes one or multiple profile names of which the storage will be deleted.
"""

from aiida.manage.manager import get_manager
from aiida.orm import Group, Node, QueryBuilder
from aiida.tools import delete_nodes

manager = get_manager()
storage = manager.get_profile_storage()

if not force:
echo.echo_warning('This operation cannot be undone, are you sure you want to continue?', nl=False)

if not force and not click.confirm(''):
echo.echo_report(f'Deleting of `{profile.name}` cancelled.')

else:
# Delete nodes
qb = QueryBuilder()
qb.append(Node)
nodes = qb.all()
node_ids = [node[0].pk for node in nodes]
delete_nodes(node_ids, dry_run=False)

# Delete groups
groups = Group.collection.all()
for group in groups:
Group.collection.delete(group.pk)

# Possibly further cleanup
storage.maintain(full=True, dry_run=False)

# Users and Computers?
18 changes: 18 additions & 0 deletions src/aiida/repository/backend/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ def put_object_from_filelike(self, handle: BinaryIO) -> str:
def _put_object_from_filelike(self, handle: BinaryIO) -> str:
pass

def put_objects_from_filelike_packed(self, handle_list) -> str:
"""Store the byte contents of a file in the repository.

:param handle: filelike object with the byte content to be stored.
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
# if (
# not isinstance(handle, io.BufferedIOBase) # type: ignore[redundant-expr,unreachable]
# and not self.is_readable_byte_stream(handle)
# ):
# raise TypeError(f'handle does not seem to be a byte stream: {type(handle)}.')
return self._put_objects_from_filelike_packed(handle_list)

# @abc.abstractmethod
# def _put_objects_from_filelike_packed(self, handle_list: list) -> str:
# pass

def put_object_from_file(self, filepath: Union[str, pathlib.Path]) -> str:
"""Store a new object with contents of the file located at `filepath` on this file system.

Expand Down
7 changes: 7 additions & 0 deletions src/aiida/repository/backend/disk_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ def _put_object_from_filelike(self, handle: t.BinaryIO) -> str:
with self._container as container:
return container.add_streamed_object(handle)

def _put_objects_from_filelike_packed(self, handle_list) -> str:
"""Store the byte contents of a list of files in the repository."""
with self._container as container:
return container.add_streamed_objects_to_pack(
stream_list=handle_list,
)

def has_objects(self, keys: t.List[str]) -> t.List[bool]:
with self._container as container:
return container.has_objects(keys)
Expand Down
29 changes: 28 additions & 1 deletion src/aiida/tools/archive/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def import_archive(
group: Optional[orm.Group] = None,
test_run: bool = False,
backend: Optional[StorageBackend] = None,
packed: Optional[bool] = False,
) -> Optional[int]:
"""Import an archive into the AiiDA backend.

Expand Down Expand Up @@ -201,7 +202,12 @@ def import_archive(

# now the transaction has been successfully populated, but not committed, we add the repository files
# if the commit fails, this is not so much an issue, since the files can be removed on repo maintenance
_add_files_to_repo(backend_from, backend, new_repo_keys)
if packed:
IMPORT_LOGGER.report('Adding repository files to `packed`')
_add_files_to_repo_packed(backend_from, backend, new_repo_keys)
else:
IMPORT_LOGGER.report('Adding repository files to `loose`')
_add_files_to_repo(backend_from, backend, new_repo_keys)

IMPORT_LOGGER.report('Committing transaction to database...')

Expand Down Expand Up @@ -1188,3 +1194,24 @@ def _add_files_to_repo(backend_from: StorageBackend, backend_to: StorageBackend,
f'Archive repository key is different to backend key: {key!r} != {backend_key!r}'
)
progress.update()


# This is probably not having any effect here, instead, I defined _put_object_from_filelike_packed in
# AbstractRepositoryBackend
def _add_files_to_repo_packed(backend_from: StorageBackend, backend_to: StorageBackend, new_keys: Set[str]) -> None:
"""Add the new files to the repository."""
if not new_keys:
return None

repository_to = backend_to.get_repository()
repository_from = backend_from.get_repository()
# print('repository_from', repository_from)
# print('repository_to', repository_to)

with get_progress_reporter()(desc='Adding archive files to repository', total=len(new_keys)) as progress:
from io import BytesIO

from_hashes = list(repository_from.list_objects())
from_bytes_io_list = [BytesIO(repository_from.get_object_content(from_hash)) for from_hash in from_hashes]

backend_key = repository_to.put_objects_from_filelike_packed(from_bytes_io_list)
Loading