Skip to content

Commit 41074fc

Browse files
committed
Add integration tests for pod events
1 parent 9bee787 commit 41074fc

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import asyncio
2+
3+
import anyio
4+
import pytest
5+
6+
from prefect import get_client
7+
from prefect.states import StateType
8+
from prefect_kubernetes_integration_tests.utils import display, prefect_core
9+
10+
DEFAULT_JOB_VARIABLES = {
11+
"image": "prefecthq/prefect:3.2.11-python3.12",
12+
}
13+
DEFAULT_PARAMETERS = {"n": 5}
14+
# Default source is a simple flow that sleeps
15+
DEFAULT_FLOW_SOURCE = "https://gist.github.com/772d095672484b76da40a4e6158187f0.git"
16+
DEFAULT_FLOW_ENTRYPOINT = "sleeping.py:sleepy"
17+
DEFAULT_FLOW_NAME = "pod-eviction-test"
18+
19+
20+
@pytest.mark.usefixtures("kind_cluster")
21+
async def test_happy_path_events(
22+
work_pool_name: str,
23+
):
24+
"""Test that flow runs properly handle pod evictions."""
25+
flow_run = await prefect_core.create_flow_run(
26+
source=DEFAULT_FLOW_SOURCE,
27+
entrypoint=DEFAULT_FLOW_ENTRYPOINT,
28+
name=DEFAULT_FLOW_NAME,
29+
work_pool_name=work_pool_name,
30+
job_variables=DEFAULT_JOB_VARIABLES,
31+
parameters=DEFAULT_PARAMETERS,
32+
)
33+
34+
display.print_flow_run_created(flow_run)
35+
36+
prefect_core.start_worker(work_pool_name, run_once=True)
37+
38+
async with get_client() as client:
39+
updated_flow_run = await client.read_flow_run(flow_run.id)
40+
41+
assert updated_flow_run.state is not None, "Flow run state should not be None"
42+
assert updated_flow_run.state.type == StateType.COMPLETED, (
43+
"Expected flow run to be COMPLETED. Got "
44+
f"{updated_flow_run.state.type} instead."
45+
)
46+
47+
display.print_flow_run_result(updated_flow_run)
48+
49+
events = []
50+
with anyio.move_on_after(10):
51+
while len(events) < 3:
52+
events = await prefect_core.read_pod_events_for_flow_run(flow_run.id)
53+
await asyncio.sleep(1)
54+
assert len(events) == 3, "Expected 3 events"

src/integrations/prefect-kubernetes/integration_tests/src/prefect_kubernetes_integration_tests/utils/prefect_core.py

+19
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from prefect import flow, get_client
1111
from prefect.client.schemas.objects import FlowRun
12+
from prefect.events.schemas.events import Event
1213
from prefect.states import StateType
1314

1415
console = Console()
@@ -80,3 +81,21 @@ def wait_for_flow_run_state(
8081
raise TimeoutError(
8182
f"Flow run {flow_run_id} did not reach state {target_state} within {timeout} seconds"
8283
)
84+
85+
86+
async def read_pod_events_for_flow_run(flow_run_id: UUID) -> list[Event]:
87+
"""Read events for a flow run."""
88+
async with get_client() as client:
89+
response = await client.request(
90+
"POST",
91+
"/events/filter",
92+
json={
93+
"filter": {
94+
"event": {"prefix": ["prefect.kubernetes.pod"]},
95+
"related": {
96+
"id": [f"prefect.flow-run.{flow_run_id}"],
97+
},
98+
},
99+
},
100+
)
101+
return [Event.model_validate(event) for event in response.json()["events"]]

0 commit comments

Comments
 (0)