Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Deflake test_metrics #47750

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a0d6bd0
right size tests
GeneDer Sep 19, 2024
20bb4b5
trigger another build
GeneDer Sep 20, 2024
e688c20
factor out test_metrics on it's own and use large sized test
GeneDer Sep 20, 2024
ecbc2f6
fix
GeneDer Sep 20, 2024
320d1ba
fix tag
GeneDer Sep 20, 2024
1870bdc
Merge branch 'master' into deflak-test-metrics
GeneDer Sep 20, 2024
70fbb9d
fix kwargs
GeneDer Sep 20, 2024
9165510
try again
GeneDer Sep 20, 2024
d15d0d0
test again
GeneDer Sep 20, 2024
2ffefce
test again
GeneDer Sep 20, 2024
8e22021
test again
GeneDer Sep 20, 2024
efceb03
revert change and add logics to clean up metrics between tests
GeneDer Sep 24, 2024
eb15873
lint
GeneDer Sep 24, 2024
9502dd4
check health for prometheus before cleanup
GeneDer Sep 25, 2024
71d1336
refactor clean up metrics as a fixture
GeneDer Sep 25, 2024
dab214c
test again
GeneDer Sep 25, 2024
22f9145
test again
GeneDer Sep 25, 2024
46ce3a0
test again
GeneDer Sep 25, 2024
3e63207
test again
GeneDer Sep 25, 2024
0a5cbf0
clean up serve and ray before and after the tests
GeneDer Sep 28, 2024
04ee77c
try again
GeneDer Sep 28, 2024
869594b
try again
GeneDer Sep 28, 2024
0838d2a
Merge branch 'master' into deflak-test-metrics
GeneDer Sep 30, 2024
25cc12a
Merge branch 'master' into deflak-test-metrics
GeneDer Sep 30, 2024
ff1a839
Merge branch 'master' into deflak-test-metrics
GeneDer Oct 1, 2024
1482c03
try again
GeneDer Oct 1, 2024
e9a88c6
try again
GeneDer Oct 2, 2024
747d479
only decrement num_scheduling_tasks_in_backoff if it's greater than 0
GeneDer Oct 2, 2024
79fce0e
try again
GeneDer Oct 2, 2024
d924b7e
try again
GeneDer Oct 3, 2024
650452c
try again
GeneDer Oct 3, 2024
e0aa69c
wait for proxies to be healthy before starting any tests
GeneDer Oct 3, 2024
64d88a7
try again
GeneDer Oct 3, 2024
853662d
Merge branch 'master' into deflak-test-metrics
GeneDer Oct 4, 2024
709a0a9
try again
GeneDer Oct 4, 2024
00a45bf
try again
GeneDer Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion python/ray/serve/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,3 @@ py_test_module_list(
"//python/ray/serve:serve_lib",
],
)

87 changes: 64 additions & 23 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
fetch_prometheus_metrics,
wait_for_condition,
)
from ray.serve._private.common import ProxyStatus
from ray.serve._private.constants import DEFAULT_LATENCY_BUCKET_MS
from ray.serve._private.long_poll import LongPollHost, UpdatedObject
from ray.serve._private.test_utils import (
Expand All @@ -35,6 +36,10 @@
@pytest.fixture
def serve_start_shutdown():
"""Fixture provides a fresh Ray cluster to prevent metrics state sharing."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm shouldn't this be sufficient on its own? The prometheus endpoint is the raylet, so if ray is shut down between runs there should be no state sharing.

What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation is there are something that's not cleaned up in between those tests. And in fact adding those calls seems to helped. Now that thinking through it again maybe just adding some sleep in between will also help the same way and maybe the issue is serve and/or ray wasn't complete shutdown before the next test starts? 🤔 Let me do some more experiments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not add any sleeps -- if we need to wait for anything to clean up, then explicitly wait for the cleanup to happen

sleeps are what make things flaky in the first place

while len(serve.status().proxies) > 0 or len(serve.status().applications) > 0:
serve.shutdown()
while ray.is_initialized():
ray.shutdown()
ray.init(
_metrics_export_port=TEST_METRICS_EXPORT_PORT,
_system_config={
Expand All @@ -53,11 +58,27 @@ def serve_start_shutdown():
grpc_servicer_functions=grpc_servicer_functions,
),
)
serve.shutdown()
ray.shutdown()
while len(serve.status().proxies) > 0 or len(serve.status().applications) > 0:
serve.shutdown()
while ray.is_initialized():
ray.shutdown()
ray._private.utils.reset_ray_address()


@pytest.fixture
def wait_for_health_proxies():
def check():
return all(
[
status == ProxyStatus.HEALTHY
for status in serve.status().proxies.values()
]
)

wait_for_condition(check)
yield


def extract_tags(line: str) -> Dict[str, str]:
"""Extracts any tags from the metrics line."""

Expand Down Expand Up @@ -193,7 +214,9 @@ def metric_available() -> bool:
return metric_dicts


def test_serve_metrics_for_successful_connection(serve_start_shutdown):
def test_serve_metrics_for_successful_connection(
serve_start_shutdown, wait_for_health_proxies
):
@serve.deployment(name="metrics")
async def f(request):
return "hello"
Expand Down Expand Up @@ -258,7 +281,7 @@ def verify_metrics(do_assert=False):
verify_metrics(do_assert=True)


def test_http_replica_gauge_metrics(serve_start_shutdown):
def test_http_replica_gauge_metrics(serve_start_shutdown, wait_for_health_proxies):
"""Test http replica gauge metrics"""
signal = SignalActor.remote()

Expand Down Expand Up @@ -291,7 +314,7 @@ def ensure_request_processing():
wait_for_condition(ensure_request_processing, timeout=5)


def test_proxy_metrics_not_found(serve_start_shutdown):
def test_proxy_metrics_not_found(serve_start_shutdown, wait_for_health_proxies):
# NOTE: These metrics should be documented at
# https://docs.ray.io/en/latest/serve/monitoring.html#metrics
# Any updates here should be reflected there too.
Expand Down Expand Up @@ -333,7 +356,7 @@ def verify_metrics(_expected_metrics, do_assert=False):
verify_metrics,
retry_interval_ms=1000,
timeout=10,
expected_metrics=expected_metrics,
_expected_metrics=expected_metrics,
)
except RuntimeError:
verify_metrics(expected_metrics, True)
Expand Down Expand Up @@ -383,7 +406,7 @@ def verify_error_count(do_assert=False):
verify_error_count(do_assert=True)


def test_proxy_metrics_internal_error(serve_start_shutdown):
def test_proxy_metrics_internal_error(serve_start_shutdown, wait_for_health_proxies):
# NOTE: These metrics should be documented at
# https://docs.ray.io/en/latest/serve/monitoring.html#metrics
# Any updates here should be reflected there too.
Expand Down Expand Up @@ -434,7 +457,7 @@ async def __call__(self, *args):
verify_metrics,
retry_interval_ms=1000,
timeout=10,
expected_metrics=expected_metrics,
_expected_metrics=expected_metrics,
)
except RuntimeError:
verify_metrics(expected_metrics, True)
Expand Down Expand Up @@ -478,7 +501,7 @@ def verify_error_count(do_assert=False):
verify_error_count(do_assert=True)


def test_proxy_metrics_fields_not_found(serve_start_shutdown):
def test_proxy_metrics_fields_not_found(serve_start_shutdown, wait_for_health_proxies):
"""Tests the proxy metrics' fields' behavior for not found."""

# Should generate 404 responses
Expand Down Expand Up @@ -522,7 +545,9 @@ def test_proxy_metrics_fields_not_found(serve_start_shutdown):
print("serve_num_grpc_error_requests working as expected.")


def test_proxy_metrics_fields_internal_error(serve_start_shutdown):
def test_proxy_metrics_fields_internal_error(
serve_start_shutdown, wait_for_health_proxies
):
"""Tests the proxy metrics' fields' behavior for internal error."""

@serve.deployment()
Expand Down Expand Up @@ -583,7 +608,7 @@ def f(*args):
print("serve_grpc_request_latency_ms_sum working as expected.")


def test_replica_metrics_fields(serve_start_shutdown):
def test_replica_metrics_fields(serve_start_shutdown, wait_for_health_proxies):
"""Test replica metrics fields"""

@serve.deployment
Expand Down Expand Up @@ -722,7 +747,9 @@ def verify_metrics(self, metric, expected_output):
for key in expected_output:
assert metric[key] == expected_output[key]

def test_request_context_pass_for_http_proxy(self, serve_start_shutdown):
def test_request_context_pass_for_http_proxy(
self, serve_start_shutdown, wait_for_health_proxies
):
"""Test HTTP proxy passing request context"""

@serve.deployment(graceful_shutdown_timeout_s=0.001)
Expand Down Expand Up @@ -816,7 +843,9 @@ def check():
assert metrics_app_name["g"] == "app2", msg
assert metrics_app_name["h"] == "app3", msg

def test_request_context_pass_for_grpc_proxy(self, serve_start_shutdown):
def test_request_context_pass_for_grpc_proxy(
self, serve_start_shutdown, wait_for_health_proxies
):
"""Test gRPC proxy passing request context"""

@serve.deployment(graceful_shutdown_timeout_s=0.001)
Expand Down Expand Up @@ -912,7 +941,9 @@ def check():
assert metrics_app_name[depl_name2] == "app2", msg
assert metrics_app_name[depl_name3] == "app3", msg

def test_request_context_pass_for_handle_passing(self, serve_start_shutdown):
def test_request_context_pass_for_handle_passing(
self, serve_start_shutdown, wait_for_health_proxies
):
"""Test handle passing contexts between replicas"""

@serve.deployment
Expand Down Expand Up @@ -970,7 +1001,9 @@ async def app2(self):
assert requests_metrics_app_name["g1"] == "app"
assert requests_metrics_app_name["g2"] == "app"

def test_customer_metrics_with_context(self, serve_start_shutdown):
def test_customer_metrics_with_context(
self, serve_start_shutdown, wait_for_health_proxies
):
@serve.deployment
class Model:
def __init__(self):
Expand Down Expand Up @@ -1062,7 +1095,9 @@ def __call__(self):
self.verify_metrics(histogram_metrics[0], expected_metrics)

@pytest.mark.parametrize("use_actor", [False, True])
def test_serve_metrics_outside_serve(self, use_actor, serve_start_shutdown):
def test_serve_metrics_outside_serve(
self, use_actor, serve_start_shutdown, wait_for_health_proxies
):
"""Make sure ray.serve.metrics work in ray actor"""
if use_actor:

Expand Down Expand Up @@ -1186,7 +1221,7 @@ async def __call__(self):
self.verify_metrics(histogram_metrics[0], expected_metrics)


def test_multiplexed_metrics(serve_start_shutdown):
def test_multiplexed_metrics(serve_start_shutdown, wait_for_health_proxies):
"""Tests multiplexed API corresponding metrics."""

@serve.deployment
Expand Down Expand Up @@ -1261,7 +1296,7 @@ async def call(self, *args):


class TestHandleMetrics:
def test_queued_queries_basic(self, serve_start_shutdown):
def test_queued_queries_basic(self, serve_start_shutdown, wait_for_health_proxies):
signal = SignalActor.options(name="signal123").remote()
serve.run(WaitForSignal.options(max_ongoing_requests=1).bind(), name="app1")

Expand Down Expand Up @@ -1290,7 +1325,9 @@ def test_queued_queries_basic(self, serve_start_shutdown):
expected=0,
)

def test_queued_queries_multiple_handles(self, serve_start_shutdown):
def test_queued_queries_multiple_handles(
self, serve_start_shutdown, wait_for_health_proxies
):
signal = SignalActor.options(name="signal123").remote()
serve.run(WaitForSignal.options(max_ongoing_requests=1).bind(), name="app1")

Expand Down Expand Up @@ -1330,7 +1367,9 @@ def test_queued_queries_multiple_handles(self, serve_start_shutdown):
expected=0,
)

def test_queued_queries_disconnected(self, serve_start_shutdown):
def test_queued_queries_disconnected(
self, serve_start_shutdown, wait_for_health_proxies
):
"""Check that disconnected queued queries are tracked correctly."""

signal = SignalActor.remote()
Expand Down Expand Up @@ -1471,7 +1510,9 @@ def do_request():
# Unblock hanging request.
ray.get(signal.send.remote())

def test_running_requests_gauge(self, serve_start_shutdown):
def test_running_requests_gauge(
self, serve_start_shutdown, wait_for_health_proxies
):
signal = SignalActor.options(name="signal123").remote()
serve.run(
Router.options(num_replicas=2, ray_actor_options={"num_cpus": 0}).bind(
Expand Down Expand Up @@ -1531,7 +1572,7 @@ def test_running_requests_gauge(self, serve_start_shutdown):
)


def test_long_poll_host_sends_counted(serve_instance):
def test_long_poll_host_sends_counted(serve_start_shutdown, wait_for_health_proxies):
"""Check that the transmissions by the long_poll are counted."""

host = ray.remote(LongPollHost).remote(
Expand Down Expand Up @@ -1588,7 +1629,7 @@ def test_long_poll_host_sends_counted(serve_instance):
)


def test_actor_summary(serve_instance):
def test_actor_summary(serve_start_shutdown, wait_for_health_proxies):
@serve.deployment
def f():
pass
Expand Down