Skip to content

Commit

Permalink
[perf] Implement shared HTTP client with connection pooling for devic…
Browse files Browse the repository at this point in the history
…e inference
  • Loading branch information
charlieyl committed Feb 11, 2025
1 parent 5478f7d commit cbfa31c
Showing 1 changed file with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,30 @@


class FedMLHttpInference:
_shared_client = None

@classmethod
async def get_client(cls) -> httpx.AsyncClient:
"""Get or create the shared HTTP client instance with connection pool"""
if cls._shared_client is None:
cls._shared_client = httpx.AsyncClient(
timeout=60.0, # 默认超时时间
limits=httpx.Limits(
max_connections=200, # 最大并发连接数
max_keepalive_connections=30, # 保持活跃的连接数
keepalive_expiry=30.0 # 连接保持活跃的时间(秒)
),
http2=True # 启用 HTTP/2 支持
)
return cls._shared_client

@classmethod
async def close_client(cls):
"""Close the shared HTTP client if it exists"""
if cls._shared_client is not None:
await cls._shared_client.aclose()
cls._shared_client = None

def __init__(self):
pass

Expand All @@ -28,8 +52,9 @@ async def is_inference_ready(inference_url, path="ready", timeout=None):

# TODO (Raphael): Support more methods and return codes rules.
try:
async with httpx.AsyncClient() as client:
ready_response = await client.get(url=ready_url, timeout=timeout)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_client()
ready_response = await client.get(url=ready_url, timeout=timeout)

if isinstance(ready_response, (Response, StreamingResponse)):
error_code = ready_response.status_code
Expand Down Expand Up @@ -88,22 +113,24 @@ async def run_http_inference_with_curl_request(


async def stream_generator(inference_url, input_json, method="POST"):
async with httpx.AsyncClient() as client:
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_client()
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"


async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json,
timeout=None, method="POST"):
response_ok = True
try:
async with httpx.AsyncClient() as client:
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_client()
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
except Exception as e:
response_ok = False
model_inference_result = {"error": e}
Expand Down

0 comments on commit cbfa31c

Please sign in to comment.