Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GET /project/{project_id}/status/images/ #2396

Merged
merged 13 commits into from
Apr 2, 2025
119 changes: 94 additions & 25 deletions fractal_server/app/routes/api/v2/history.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Any
from typing import Optional

from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import status
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pydantic import Field
from pydantic import field_validator
from pydantic import model_validator
from sqlmodel import func
from sqlmodel import select

Expand All @@ -25,15 +30,41 @@
from fractal_server.app.routes.pagination import get_pagination_params
from fractal_server.app.routes.pagination import PaginationRequest
from fractal_server.app.routes.pagination import PaginationResponse
from fractal_server.app.schemas._filter_validators import (
validate_attribute_filters,
)
from fractal_server.app.schemas._validators import root_validate_dict_keys
from fractal_server.app.schemas.v2 import HistoryRunReadAggregated
from fractal_server.app.schemas.v2 import HistoryUnitRead
from fractal_server.app.schemas.v2 import HistoryUnitStatus
from fractal_server.app.schemas.v2 import HistoryUnitStatusQuery
from fractal_server.app.schemas.v2 import ImageLogsRequest
from fractal_server.app.schemas.v2 import ZarrUrlAndStatus
from fractal_server.app.schemas.v2 import SingleImageWithStatus
from fractal_server.images.models import AttributeFiltersType
from fractal_server.images.tools import aggregate_attributes
from fractal_server.images.tools import aggregate_types
from fractal_server.images.tools import filter_image_list
from fractal_server.images.tools import merge_type_filters
from fractal_server.logger import set_logger


class AttributesQuery(BaseModel):
attribute_filters: AttributeFiltersType = Field(default_factory=dict)

_dict_keys = model_validator(mode="before")(
classmethod(root_validate_dict_keys)
)
_attribute_filters = field_validator("attribute_filters")(
classmethod(validate_attribute_filters)
)


class ImageWithStatusPage(PaginationResponse[SingleImageWithStatus]):

attributes: dict[str, list[Any]]
types: list[str]


router = APIRouter()
logger = set_logger(__name__)

Expand Down Expand Up @@ -213,15 +244,17 @@ async def get_history_run_units(
)


@router.get("/project/{project_id}/status/images/")
@router.post("/project/{project_id}/status/images/")
async def get_history_images(
project_id: int,
dataset_id: int,
workflowtask_id: int,
attributes_query: AttributesQuery,
unit_status: Optional[HistoryUnitStatusQuery] = None,
user: UserOAuth = Depends(current_active_user),
db: AsyncSession = Depends(get_async_db),
pagination: PaginationRequest = Depends(get_pagination_params),
) -> PaginationResponse[ZarrUrlAndStatus]:
) -> ImageWithStatusPage:

# Access control and object retrieval
wftask = await get_wftask_check_owner(
Expand Down Expand Up @@ -264,10 +297,14 @@ async def get_history_images(
actual_filters.update(type_filters_patch)
logger.debug(f"{prefix} {actual_filters=}")
# (1D) Get all matching images from the dataset
filtered_dataset_images = filter_image_list(
type_filtered_dataset_images = filter_image_list(
images=dataset.images,
type_filters=inferred_dataset_type_filters,
)
filtered_dataset_images = filter_image_list(
type_filtered_dataset_images,
attribute_filters=attributes_query.attribute_filters,
)
logger.debug(f"{prefix} {len(dataset.images)=}")
logger.debug(f"{prefix} {len(filtered_dataset_images)=}")
# (1E) Extract the list of URLs for filtered images
Expand All @@ -276,52 +313,84 @@ async def get_history_images(
)

# (2) Get `(zarr_url, status)` pairs for all images that have already
# been processed
res = await db.execute(
# been processed, and
# (3) When relevant, find images that have not been processed
base_stmt = (
select(HistoryImageCache.zarr_url, HistoryUnit.status)
.join(HistoryUnit)
.where(HistoryImageCache.dataset_id == dataset_id)
.where(HistoryImageCache.workflowtask_id == workflowtask_id)
.where(HistoryImageCache.latest_history_unit_id == HistoryUnit.id)
.where(HistoryImageCache.zarr_url.in_(filtered_dataset_images_url))
.order_by(HistoryImageCache.zarr_url)
)
list_processed_url_status = res.all()

if unit_status in [HistoryUnitStatusQuery.UNSET, None]:
stmt = base_stmt.order_by(HistoryImageCache.zarr_url)
res = await db.execute(stmt)
list_processed_url_status = res.all()
list_processed_url = list(
item[0] for item in list_processed_url_status
)
list_non_processed_url_status = list(
(url, None)
for url in filtered_dataset_images_url
if url not in list_processed_url
)
if unit_status == HistoryUnitStatusQuery.UNSET:
list_processed_url_status = []
else:
stmt = base_stmt.where(HistoryUnit.status == unit_status).order_by(
HistoryImageCache.zarr_url
)
res = await db.execute(stmt)
list_processed_url_status = res.all()
list_non_processed_url_status = []

logger.debug(f"{prefix} {len(list_processed_url_status)=}")
logger.debug(f"{prefix} {len(list_non_processed_url_status)=}")

# (3) Combine outputs from 1 and 2
list_processed_url = list(item[0] for item in list_processed_url_status)
logger.debug(f"{prefix} {len(list_processed_url)=}")

list_non_processed_url_status = list(
(url, None)
for url in filtered_dataset_images_url
if url not in list_processed_url
full_list_url_status = (
list_processed_url_status + list_non_processed_url_status
)
logger.debug(f"{prefix} {len(list_non_processed_url_status)=}")
logger.debug(f"{prefix} {len(full_list_url_status)=}")

attributes = aggregate_attributes(type_filtered_dataset_images)
types = aggregate_types(filtered_dataset_images)

sorted_list_url_status = sorted(
list_processed_url_status + list_non_processed_url_status,
full_list_url_status,
key=lambda url_status: url_status[0],
)
logger.debug(f"{prefix} {len(sorted_list_url_status)=}")

# Final list of objects
sorted_list_objects = list(
dict(zarr_url=url_status[0], status=url_status[1])
for url_status in sorted_list_url_status
)

total_count = len(sorted_list_objects)
total_count = len(sorted_list_url_status)
page_size = pagination.page_size or total_count

paginated_list_url_status = sorted_list_url_status[
(pagination.page - 1) * page_size : pagination.page * page_size
]

# Aggregate information to create 'SingleImageWithStatus'
items = [
{
**filtered_dataset_images[
filtered_dataset_images_url.index(url_status[0])
],
"status": url_status[1],
}
for url_status in paginated_list_url_status
]

return dict(
current_page=pagination.page,
page_size=page_size,
total_count=total_count,
items=sorted_list_objects[
(pagination.page - 1) * page_size : pagination.page * page_size
],
items=items,
attributes=attributes,
types=types,
)


Expand Down
14 changes: 4 additions & 10 deletions fractal_server/app/routes/api/v2/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from fractal_server.images import SingleImage
from fractal_server.images import SingleImageUpdate
from fractal_server.images.models import AttributeFiltersType
from fractal_server.images.tools import aggregate_attributes
from fractal_server.images.tools import aggregate_types
from fractal_server.images.tools import find_image_by_zarr_url
from fractal_server.images.tools import match_filter

Expand Down Expand Up @@ -133,16 +135,8 @@ async def query_dataset_images(
dataset = output["dataset"]
images = dataset.images

attributes = {}
for image in images:
for k, v in image["attributes"].items():
attributes.setdefault(k, []).append(v)
for k, v in attributes.items():
attributes[k] = list(set(v))

types = list(
set(type for image in images for type in image["types"].keys())
)
attributes = aggregate_attributes(images)
types = aggregate_types(images)

if query is not None:

Expand Down
3 changes: 2 additions & 1 deletion fractal_server/app/schemas/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from .history import HistoryRunReadAggregated # noqa F401
from .history import HistoryUnitRead # noqa F401
from .history import HistoryUnitStatus # noqa F401
from .history import HistoryUnitStatusQuery # noqa F401
from .history import ImageLogsRequest # noqa F401
from .history import ZarrUrlAndStatus # noqa F401
from .history import SingleImageWithStatus # noqa F401
from .job import JobCreateV2 # noqa F401
from .job import JobReadV2 # noqa F401
from .job import JobStatusTypeV2 # noqa F401
Expand Down
14 changes: 12 additions & 2 deletions fractal_server/app/schemas/v2/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pydantic import BaseModel
from pydantic import field_serializer

from ....images import SingleImage


class HistoryUnitStatus(str, Enum):
"""
Expand All @@ -23,6 +25,15 @@ class HistoryUnitStatus(str, Enum):
FAILED = "failed"


class HistoryUnitStatusQuery(str, Enum):

SUBMITTED = "submitted"
DONE = "done"
FAILED = "failed"

UNSET = "unset"


class HistoryUnitRead(BaseModel):
id: int
logfile: Optional[str] = None
Expand All @@ -49,6 +60,5 @@ class ImageLogsRequest(BaseModel):
zarr_url: str


class ZarrUrlAndStatus(BaseModel):
zarr_url: str
class SingleImageWithStatus(SingleImage):
status: Optional[HistoryUnitStatus] = None
23 changes: 23 additions & 0 deletions fractal_server/images/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,26 @@ def merge_type_filters(
merged_dict = task_input_types
merged_dict.update(wftask_type_filters)
return merged_dict


def aggregate_attributes(images: list[dict[str, Any]]) -> dict[str, list[Any]]:
"""
Given a list of images, this function returns a dictionary of all image
attributes, each mapped to a list of present values.
"""
attributes = {}
for image in images:
for k, v in image["attributes"].items():
attributes.setdefault(k, []).append(v)
for k, v in attributes.items():
attributes[k] = list(set(v))
return attributes


def aggregate_types(images: list[dict[str, Any]]) -> list[str]:
"""
Given a list of images, this function returns a list of all image types.
"""
return list(
set(type for image in images for type in image["types"].keys())
)
Loading
Loading