Skip to content

Commit

Permalink
Extends ImageSpec to accept image names from plugin and have priority…
Browse files Browse the repository at this point in the history
… for plugins (flyteorg#2119)

* Extends image spec to have priority and accept image names from plugin

Signed-off-by: Thomas J. Fan <[email protected]>

* TST Simplify implemenation of registry

Signed-off-by: Thomas J. Fan <[email protected]>

* FIX Be more specific about max key

Signed-off-by: Thomas J. Fan <[email protected]>

* FIX Fixes failing test

Signed-off-by: Thomas J. Fan <[email protected]>

* DOC Adds comment about _IMAGE_NAME_TO_REAL_NAME

Signed-off-by: Thomas J. Fan <[email protected]>

---------

Signed-off-by: Thomas J. Fan <[email protected]>
  • Loading branch information
thomasjpfan authored Jan 24, 2024
1 parent f115157 commit 3966d1a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 15 deletions.
48 changes: 34 additions & 14 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import asdict, dataclass
from functools import lru_cache
from importlib import metadata
from typing import List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union

import click
import requests
Expand Down Expand Up @@ -48,7 +48,7 @@ class ImageSpec:

name: str = "flytekit"
python_version: str = None # Use default python in the base image if None.
builder: str = "envd"
builder: Optional[str] = None
source_root: Optional[str] = None
env: Optional[typing.Dict[str, str]] = None
registry: Optional[str] = None
Expand All @@ -71,9 +71,15 @@ def __post_init__(self):
self.registry = self.registry.lower()

def image_name(self) -> str:
"""
return full image name with tag.
"""
"""Full image name with tag."""
image_name = self._image_name()
try:
return ImageBuildEngine._IMAGE_NAME_TO_REAL_NAME[image_name]
except KeyError:
return image_name

def _image_name(self) -> str:
"""Construct full image name with tag."""
tag = calculate_hash_from_image_spec(self)
container_image = f"{self.name}:{tag}"
if self.registry:
Expand Down Expand Up @@ -178,12 +184,15 @@ def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec":

class ImageSpecBuilder:
@abstractmethod
def build_image(self, image_spec: ImageSpec):
def build_image(self, image_spec: ImageSpec) -> Optional[str]:
"""
Build the docker image and push it to the registry.
Args:
image_spec: image spec of the task.
Returns:
fully_qualified_image_name: Fully qualified image name. If None, then `image_spec.image_name()` is used.
"""
raise NotImplementedError("This method is not implemented in the base class.")

Expand All @@ -193,24 +202,33 @@ class ImageBuildEngine:
ImageBuildEngine contains a list of builders that can be used to build an ImageSpec.
"""

_REGISTRY: typing.Dict[str, ImageSpecBuilder] = {}
_REGISTRY: typing.Dict[str, Tuple[ImageSpecBuilder, int]] = {}
_BUILT_IMAGES: typing.Set[str] = set()
# _IMAGE_NAME_TO_REAL_NAME is used to keep track of the fully qualified image name
# returned by the image builder. This allows ImageSpec to map from `image_spc.image_name()`
# to the real qualified name.
_IMAGE_NAME_TO_REAL_NAME: Dict[str, str] = {}

@classmethod
def register(cls, builder_type: str, image_spec_builder: ImageSpecBuilder):
cls._REGISTRY[builder_type] = image_spec_builder
def register(cls, builder_type: str, image_spec_builder: ImageSpecBuilder, priority: int = 5):
cls._REGISTRY[builder_type] = (image_spec_builder, priority)

@classmethod
@lru_cache
def build(cls, image_spec: ImageSpec):
def build(cls, image_spec: ImageSpec) -> str:
if image_spec.builder is None and cls._REGISTRY:
builder = max(cls._REGISTRY, key=lambda name: cls._REGISTRY[name][1])
else:
builder = image_spec.builder

img_name = image_spec.image_name()
if img_name in cls._BUILT_IMAGES or image_spec.exist():
click.secho(f"Image {img_name} found. Skip building.", fg="blue")
else:
click.secho(f"Image {img_name} not found. Building...", fg="blue")
if image_spec.builder not in cls._REGISTRY:
raise Exception(f"Builder {image_spec.builder} is not registered.")
if image_spec.builder == "envd":
if builder not in cls._REGISTRY:
raise Exception(f"Builder {builder} is not registered.")
if builder == "envd":
envd_version = metadata.version("envd")
# flytekit v1.10.2+ copies the workflow code to the WorkDir specified in the Dockerfile. However, envd<0.3.39
# overwrites the WorkDir when building the image, resulting in a permission issue when flytekit downloads the file.
Expand All @@ -220,7 +238,9 @@ def build(cls, image_spec: ImageSpec):
f" Please upgrade envd to v0.3.39+."
)

cls._REGISTRY[image_spec.builder].build_image(image_spec)
fully_qualified_image_name = cls._REGISTRY[builder][0].build_image(image_spec)
if fully_qualified_image_name is not None:
cls._IMAGE_NAME_TO_REAL_NAME[img_name] = fully_qualified_image_name
cls._BUILT_IMAGES.add(img_name)


Expand Down
16 changes: 15 additions & 1 deletion plugins/flytekit-envd/tests/test_image_spec.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
from pathlib import Path
from textwrap import dedent

import pytest
from flytekitplugins.envd.image_builder import EnvdImageSpecBuilder, create_envd_config

from flytekit.image_spec.image_spec import ImageSpec
from flytekit.image_spec.image_spec import ImageBuildEngine, ImageSpec


@pytest.fixture(scope="module", autouse=True)
def register_envd_higher_priority():
# Register a new envd platform with the highest priority so the test in this file uses envd
highest_priority_builder = max(ImageBuildEngine._REGISTRY, key=ImageBuildEngine._REGISTRY.get)
highest_priority = ImageBuildEngine._REGISTRY[highest_priority_builder][1]
yield ImageBuildEngine.register(
"envd_high_priority",
EnvdImageSpecBuilder(),
priority=highest_priority + 1,
)
del ImageBuildEngine._REGISTRY["envd_high_priority"]


def test_image_spec():
Expand Down
22 changes: 22 additions & 0 deletions tests/flytekit/unit/core/image_spec/test_image_spec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from unittest.mock import Mock

import pytest

Expand Down Expand Up @@ -75,3 +76,24 @@ def test_image_spec(mock_image_spec_builder):
# ImageSpec should be immutable
image_spec.with_commands("ls")
assert image_spec.commands == ["echo hello"]


def test_image_spec_engine_priority():
image_spec = ImageSpec(name="FLYTEKIT")
image_name = image_spec.image_name()

new_image_name = f"fqn.xyz/{image_name}"
mock_image_builder_10 = Mock()
mock_image_builder_10.build_image.return_value = new_image_name
mock_image_builder_default = Mock()
mock_image_builder_default.build_image.side_effect = ValueError("should not be called")

ImageBuildEngine.register("build_10", mock_image_builder_10, priority=10)
ImageBuildEngine.register("build_default", mock_image_builder_default)

ImageBuildEngine.build(image_spec)
mock_image_builder_10.build_image.assert_called_once_with(image_spec)

assert image_spec.image_name() == new_image_name
del ImageBuildEngine._REGISTRY["build_10"]
del ImageBuildEngine._REGISTRY["build_default"]

0 comments on commit 3966d1a

Please sign in to comment.