Skip to content

Commit

Permalink
Merge pull request #139 from valohai/output-properties-helper
Browse files Browse the repository at this point in the history
Output properties helper
  • Loading branch information
hylje authored Dec 19, 2024
2 parents 51dfaaf + 4eaefd8 commit 497252f
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 0 deletions.
201 changes: 201 additions & 0 deletions tests/test_output_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""Test handling the execution output metadata properties."""

import json
import logging
import random
import string
import sys
import time
from pathlib import Path

import pytest # type: ignore

import valohai

logger = logging.getLogger(__name__)


def test_create_properties(tmp_metadata_file):
"""Test creating a properties file."""
with valohai.output_properties() as properties:
# file in the outputs directory
properties.add(file="file.txt", properties={"foo": "bar"})
# file in a subdirectory
properties.add(file="path/to/file.txt", properties={"baz": "qux"})
# file can also be a Path object
properties.add(
file=Path("path/to/another/file.txt"), properties={"quux": "quuz"}
)

# check that the properties are set
assert properties._files_properties.get("file.txt") == {"foo": "bar"}
assert properties._files_properties.get("path/to/file.txt") == {"baz": "qux"}
assert properties._files_properties.get("path/to/another/file.txt") == {
"quux": "quuz"
}

# check that the properties are saved to the file
saved_properties = read_json_lines(tmp_metadata_file)
assert saved_properties.get("file.txt") == {"foo": "bar"}
assert saved_properties.get("path/to/file.txt") == {"baz": "qux"}
assert saved_properties.get("path/to/another/file.txt") == {"quux": "quuz"}


def test_add_to_existing_properties(tmp_metadata_file):
"""You can add new properties to a file that already has properties."""
with valohai.output_properties() as properties:
properties.add(
file="file.txt", properties={"foo": "bar", "baz": "will be overwritten"}
)
properties.add(
file="file.txt", properties={"baz": "can overwrite existing value"}
)
properties.add(file="file.txt", properties={"corge": "can add new properties"})

assert properties._files_properties.get("file.txt") == {
"foo": "bar",
"baz": "can overwrite existing value",
"corge": "can add new properties",
}


def test_add_files_to_dataset(tmp_metadata_file, random_string):
"""Add files to a new dataset version."""
with valohai.output_properties() as properties:
dataset_version_1 = properties.dataset_version_uri("dataset-1", "version")
dataset_version_2 = properties.dataset_version_uri(
"dataset-2", "another-version"
)

properties.add(
file="properties_and_dataset_version.txt",
properties={"foo": "bar"},
)
properties.add_to_dataset(
file="properties_and_dataset_version.txt",
dataset_version=dataset_version_1,
)

properties.add_to_dataset(
file="only_dataset_version.txt",
dataset_version=dataset_version_1,
)

properties.add(
file="properties_and_two_datasets.txt",
properties={
"name": "test with properties and datasets",
"random": random_string,
},
)
properties.add_to_dataset(
file="properties_and_two_datasets.txt",
dataset_version=dataset_version_1,
)
properties.add_to_dataset(
file="properties_and_two_datasets.txt",
dataset_version=dataset_version_2,
)

assert properties._files_properties.get("properties_and_dataset_version.txt") == {
"foo": "bar",
"valohai.dataset-versions": ["dataset://dataset-1/version"],
}, "Should add both properties and dataset version metadata"
assert properties._files_properties.get("only_dataset_version.txt") == {
"valohai.dataset-versions": ["dataset://dataset-1/version"]
}, "Should add dataset version metadata without any properties"
assert properties._files_properties.get("properties_and_two_datasets.txt") == {
"name": "test with properties and datasets",
"random": random_string,
"valohai.dataset-versions": list(
{
"dataset://dataset-1/version",
"dataset://dataset-2/another-version",
}
),
}, "Should add both properties and multiple dataset versions"


def test_preserve_existing_properties_between_contexts(tmp_metadata_file):
"""You should be able to add metadata bit by bit over several contexts."""
with valohai.output_properties() as properties:
properties.add(file="file.txt", properties={"foo": "bar"})

with valohai.output_properties() as properties:
properties.add(file="file.txt", properties={"baz": "qux"})
properties.add(file="another_file.txt", properties={"baz": "qux"})

saved_properties = read_json_lines(tmp_metadata_file)
assert saved_properties.get("file.txt") == {"foo": "bar", "baz": "qux"}
assert saved_properties.get("another_file.txt") == {"baz": "qux"}


def test_large_number_of_files(tmp_metadata_file, random_string):
"""Test handling metadata for a very large number of outputs."""
test_properties = {
"foo": "bar",
"baz": "this is a longer metadata string",
"random": random_string,
"number": 42,
}
nr_of_files = 100_000

start = time.perf_counter()
with valohai.output_properties() as properties:
dataset_version = properties.dataset_version_uri("test-dataset", "v1")

for i in range(nr_of_files):
properties.add(
file=f"file_{i}.txt",
properties=test_properties,
)
properties.add_to_dataset(
file=f"file_{i}.txt",
dataset_version=dataset_version,
)
end = time.perf_counter()

assert (
len(properties.properties_file.read_bytes().splitlines()) == nr_of_files
), "Should have written all entries to file"

elapsed_time = end - start
logger.debug(f"File entries: {nr_of_files:,}; elapsed time: {elapsed_time:.2f} s")
assert (
elapsed_time < 2.0
), "Should actually be under 1 second; something has changed considerably"


def read_json_lines(properties_file: Path):
"""
Read a saved properties JSON lines file back into a dictionary.
Dictionary format: {file_path: metadata, ...}
"""
return {
entry["file"]: entry["metadata"]
for entry in (
json.loads(line) for line in properties_file.read_bytes().splitlines()
)
}


@pytest.fixture
def tmp_metadata_file(tmp_path, monkeypatch) -> Path:
"""
Create a temporary metadata file for testing.
NOTE: overrides the default outputs path function the OutputProperties class uses.
"""
monkeypatch.setattr(
sys.modules["valohai.output_properties"],
"get_outputs_path",
lambda: str(tmp_path),
)
return tmp_path / "valohai.metadata.jsonl"


@pytest.fixture
def random_string() -> str:
length = 1000
keyspace: str = string.ascii_letters + string.digits
return "".join(random.choice(keyspace) for _ in range(length))
2 changes: 2 additions & 0 deletions valohai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from valohai.inputs import inputs
from valohai.internals.global_state import distributed
from valohai.metadata import logger
from valohai.output_properties import output_properties
from valohai.outputs import outputs
from valohai.parameters import parameters
from valohai.prepare_impl import prepare
Expand All @@ -17,6 +18,7 @@
"distributed",
"inputs",
"logger",
"output_properties",
"outputs",
"parameters",
"prepare",
Expand Down
128 changes: 128 additions & 0 deletions valohai/output_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Execution output properties helper.
The properties are saved in a `valohai.metadata.jsonl` file in the outputs directory
in JSON lines format.
"""

import json
import logging
from collections import Counter, defaultdict
from itertools import chain
from pathlib import Path
from typing import Any, DefaultDict, Dict, Union

from valohai.paths import get_outputs_path

File = Union[str, Path] # path to the file (relative to outputs directory)
Properties = Dict[str, Any] # metadata properties for a file
FilesProperties = DefaultDict[File, Properties]
DatasetVersionURI = str # dataset version URI (e.g. 'dataset://dataset-1/version')

logger = logging.getLogger()


class OutputProperties:
"""Helper for setting properties for output files."""

_files_properties: FilesProperties

def __init__(self) -> None:
self._files_properties = defaultdict(FilesProperties)
self.properties_file = Path(get_outputs_path()) / "valohai.metadata.jsonl"

def __enter__(self) -> "OutputProperties":
self._initialize_existing_properties()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
self._save()
self._log_created_datasets()

def add(
self,
*,
file: File,
properties: Properties,
) -> None:
"""
Add properties to a file.
If the file already has properties, the new properties will be added to them.
Args:
file: The path to the file (relative to the execution outputs root directory).
properties: The metadata properties for the file.
"""
self._files_properties[str(file)].update(properties)

def add_to_dataset(self, *, file: File, dataset_version: DatasetVersionURI) -> None:
"""
Add a file to a dataset.
Args:
file: The path to the file (relative to the execution outputs root directory).
dataset_version: The dataset version to add the file to.
"""
dataset_versions = set(
self._files_properties[str(file)].get("valohai.dataset-versions", [])
)
dataset_versions.add(dataset_version)
self.add(
file=file,
properties={"valohai.dataset-versions": list(dataset_versions)},
)

@staticmethod
def dataset_version_uri(dataset: str, version: str) -> DatasetVersionURI:
"""Return the dataset URI for the given dataset and version."""
return f"dataset://{dataset}/{version}"

def _initialize_existing_properties(self) -> None:
try:
for json_line in self.properties_file.read_bytes().splitlines():
line = json.loads(json_line)
if isinstance(line.get("file"), str) and "metadata" in line:
self._files_properties[line["file"]] = line["metadata"]
except FileNotFoundError:
return

def _save(self) -> None:
self.properties_file.write_text(
"".join(
format_line(file_path, file_metadata)
for file_path, file_metadata in self._files_properties.items()
)
)

def _log_created_datasets(self) -> None:
"""Print out a summary of created datasets to the execution log."""
datasets = [
file_metadata["valohai.dataset-versions"]
for file_metadata in self._files_properties.values()
if file_metadata.get("valohai.dataset-versions")
]
if not datasets:
return
dataset_counter = Counter(chain.from_iterable(datasets))
for dataset, nr_of_files in dataset_counter.items():
print(f"Created dataset version '{dataset}' with {nr_of_files:,} files") # noqa: T201


output_properties = OutputProperties


def format_line(file_path: File, file_metadata: Properties) -> str:
"""Format metadata for an output file into a format Valohai understands.
Args:
file_path: The path to the file (relative to the execution outputs root directory).
file_metadata: The metadata for the file.
"""
return (
json.dumps(
{
"file": file_path,
"metadata": file_metadata,
}
)
+ "\n"
)

0 comments on commit 497252f

Please sign in to comment.