Skip to content

Commit

Permalink
fix fake gcs client to accept str
Browse files Browse the repository at this point in the history
Signed-off-by: Ruiyang Wang <[email protected]>
  • Loading branch information
rynewang committed Jan 30, 2025
1 parent b0286cd commit a7ae16b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
6 changes: 3 additions & 3 deletions python/ray/dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ async def _fetch_agent_info(self, target_node_id: NodeID) -> Tuple[str, int, int
)
if not value:
# Agent info not found, retry
raise Exception()
raise Exception("Agent info not found in internal kv")
return json.loads(value.decode())

except Exception:
except Exception as e:
logger.info(
f"Failed to fetch agent info for node {target_node_id}, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
f"Failed to fetch agent info for node {target_node_id}: {e}. Retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
)
await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)

Expand Down
50 changes: 40 additions & 10 deletions python/ray/dashboard/modules/job/tests/test_http_job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tempfile
import time
from pathlib import Path
from typing import Optional, List
from typing import Optional, List, Union, Dict
from unittest.mock import patch

import pytest
Expand All @@ -23,7 +23,10 @@
wait_for_condition,
wait_until_server_available,
)
from ray.dashboard.consts import DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX
from ray.dashboard.consts import (
DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
DASHBOARD_AGENT_ADDR_IP_PREFIX,
)
from ray.dashboard.modules.dashboard_sdk import ClusterInfo, parse_cluster_info
from ray.dashboard.modules.job.job_head import JobHead
from ray.dashboard.modules.job.pydantic_models import JobDetails
Expand Down Expand Up @@ -741,29 +744,46 @@ async def test_job_head_pick_random_job_agent(monkeypatch):
# Fake GCS client
class _FakeGcsClient:
def __init__(self):
self._kv = {}
self._kv: Dict[bytes, bytes] = {}

@staticmethod
def ensure_bytes(key: Union[bytes, str]) -> bytes:
return key.encode() if isinstance(key, str) else key

async def internal_kv_put(self, key: bytes, value: bytes, **kwargs):
async def internal_kv_put(
self, key: Union[bytes, str], value: bytes, **kwargs
):
key = self.ensure_bytes(key)
self._kv[key] = value

async def internal_kv_get(self, key: bytes, **kwargs):
async def internal_kv_get(self, key: Union[bytes, str], **kwargs):
key = self.ensure_bytes(key)
return self._kv.get(key, None)

async def internal_kv_multi_get(self, keys: List[bytes], **kwargs):
return {key: self._kv.get(key, None) for key in keys}
async def internal_kv_multi_get(
self, keys: List[Union[bytes, str]], **kwargs
):
return {key: self.internal_kv_get(key) for key in keys}

async def internal_kv_del(self, key: bytes, **kwargs):
async def internal_kv_del(self, key: Union[bytes, str], **kwargs):
key = self.ensure_bytes(key)
self._kv.pop(key)

async def internal_kv_keys(self, prefix: bytes, **kwargs):
async def internal_kv_keys(self, prefix: Union[bytes, str], **kwargs):
prefix = self.ensure_bytes(prefix)
return [key for key in self._kv.keys() if key.startswith(prefix)]

class MockJobHead(JobHead):
def __init__(self):
self._agents = dict()
self._gcs_aio_client = _FakeGcsClient()

@property
def gcs_aio_client(self):
# Overrides JobHead.gcs_aio_client
return self._gcs_aio_client

job_head = MockJobHead()
job_head._gcs_aio_client = _FakeGcsClient()

async def add_agent(agent):
node_id = agent[0]
Expand All @@ -776,13 +796,23 @@ async def add_agent(agent):
json.dumps([node_ip, http_port, grpc_port]).encode(),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
await job_head._gcs_aio_client.internal_kv_put(
f"{DASHBOARD_AGENT_ADDR_IP_PREFIX}{node_ip}".encode(),
json.dumps([node_id.hex(), http_port, grpc_port]).encode(),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

async def del_agent(agent):
node_id = agent[0]
node_ip = agent[1]["ipAddress"]
await job_head._gcs_aio_client.internal_kv_del(
f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{node_id.hex()}".encode(),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
await job_head._gcs_aio_client.internal_kv_del(
f"{DASHBOARD_AGENT_ADDR_IP_PREFIX}{node_ip}".encode(),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)

head_node_id = NodeID.from_random()
await job_head._gcs_aio_client.internal_kv_put(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/modules/reporter/reporter_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ async def _get_stub_address_by_ip(
if not agent_addr_json:
return None
node_id, http_port, grpc_port = json.loads(agent_addr_json)
return node_id, ip, http_port, grpc_port
return NodeID.from_hex(node_id), ip, http_port, grpc_port

def _make_stub(
self, ip_port: str
Expand Down

0 comments on commit a7ae16b

Please sign in to comment.