Skip to content

Commit 9c4c9cb

Browse files
authored
feat(service): disagregated serving (#305)
* feat: perf perf perf reasoning streaming tasks go brrr Signed-off-by: Aaron Pham <[email protected]> * feat: disagregated service Signed-off-by: Aaron Pham <[email protected]> * chore: depends on engine for disagregated Signed-off-by: Aaron Pham <[email protected]> --------- Signed-off-by: Aaron Pham <[email protected]>
1 parent e726efe commit 9c4c9cb

File tree

2 files changed

+112
-108
lines changed

2 files changed

+112
-108
lines changed

python/asteraceae/requirements.txt

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
bentoml>=1.4.3
1+
bentoml>=1.4.5
2+
vllm==0.8.1
23
kantoku>=0.18.1
3-
openai>=1.61.0
4-
vllm==0.7.3
4+
openai>=1.67.0

python/asteraceae/service.py

+109-105
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,112 @@
11
from __future__ import annotations
2-
import logging, traceback, asyncio
3-
import bentoml, fastapi, pydantic
42

5-
from typing import AsyncGenerator, List, Literal, Optional
6-
from annotated_types import Ge, Le
7-
from typing_extensions import Annotated
3+
import logging, traceback, os, asyncio, contextlib, typing as t
4+
import bentoml, fastapi, pydantic, annotated_types as ae, typing_extensions as te
85

96
logger = logging.getLogger(__name__)
10-
logger.setLevel(logging.INFO)
117

128
openai_api_app = fastapi.FastAPI()
139

14-
MAX_TOKENS = 8192
1510
MODEL_ID = 'deepseek-ai/DeepSeek-R1-Distill-Qwen-14B'
16-
17-
SYSTEM_PROMPT = """You are a professional writing assistant influenced by the styles of Raymond Carver, Franz Kafka, Albert Camus, Iain McGilchrist, and Ian McEwan. Your task is to provide suggestions to improve a user's writing by offering concise, meaningful additions that match the stylistic choices and tonality of the given essay excerpt.
11+
STRUCTURED_OUTPUT_BACKEND = "xgrammar:disable-any-whitespace" # remove any whitespace if it is not qwen.
12+
MAX_MODEL_LEN = int(os.environ.get("MAX_MODEL_LEN", 16 * 1024))
13+
MAX_TOKENS = int(os.environ.get("MAX_TOKENS", 8 * 1024))
14+
SYSTEM_PROMPT = """You are a professional writer heavily influenced by the styles of Raymond Carver, Franz Kafka, Albert Camus, Iain McGilchrist, and Ian McEwan. Your task is to provide suggestions by offering concise, meaningful additions that match the stylistic choices and tonality of the given essay excerpt.
1815
1916
Please follow these steps to generate a suggestion:
2017
2118
1. Analyze the excerpt, paying close attention to its style, tone, and central concept.
22-
2. Consider how Raymond Carver or Ian McEwan might approach expanding or enhancing the excerpt.
19+
2. Consider how you might might approach expanding or enhancing the excerpt.
2320
3. Formulate a suggestion that builds upon the existing concept while maintaining a terse and authentic voice.
2421
4. Ensure your suggestion adds depth to the writing without drastically changing its original intent.
2522
26-
Before providing your final suggestion, wrap your analysis in <thought_process> tags. In this section:
27-
- List key stylistic elements and themes present in the excerpt
28-
- Identify specific influences from the mentioned authors
29-
- Brainstorm potential areas for improvement
30-
- Consider how each improvement aligns with the original style and tone
31-
32-
This will help ensure a thorough interpretation of the excerpt and a well-crafted suggestion. It's OK for this section to be quite long.
33-
3423
Guidelines for your suggestion:
3524
1. Keep it concise and authentic, typically one to two sentences.
3625
2. Focus on enhancing emotional depth, vivid imagery, or character insight.
3726
3. Maintain the overall tone and style of the original excerpt.
3827
4. Build upon the central concept or theme present in the excerpt.
28+
5. Make sure to provide minimum {num_suggestions} suggestions.
29+
"""
3930

40-
After your analysis, provide your final suggestion in <suggestion> tags.
31+
IMAGE = bentoml.images.PythonImage(python_version='3.11', lock_python_packages=False).requirements_file('requirements.txt').run('uv pip install --compile-bytecode flashinfer-python --find-links https://flashinfer.ai/whl/cu124/torch2.6')
4132

42-
Example output structure:
33+
class Suggestion(pydantic.BaseModel):
34+
suggestion: str
35+
reasoning: str = pydantic.Field(default='')
4336

44-
<thinking>
45-
[Your detailed analysis of the excerpt, considering style, tone, and concept]
46-
</thinking>
37+
class Suggestions(pydantic.BaseModel):
38+
suggestions: list[Suggestion]
4739

48-
<suggestion>
49-
[Your concise, meaningful suggestion to improve the writing]
50-
</suggestion>
5140

52-
Please proceed with your analysis and suggestion for the given essay excerpt."""
41+
@bentoml.asgi_app(openai_api_app, path='/v1')
42+
@bentoml.service(
43+
name='asteraceae-inference-engine',
44+
traffic={'timeout': 300, 'concurrency': 128},
45+
resources={'gpu': 1, 'gpu_type': 'nvidia-a100-80gb'},
46+
tracing={"sample_rate": 0.5},
47+
envs=[
48+
{'name': 'HF_TOKEN'},
49+
{'name': 'UV_NO_PROGRESS', 'value': '1'},
50+
{'name': 'HF_HUB_DISABLE_PROGRESS_BARS', 'value': '1'},
51+
{'name': 'VLLM_ATTENTION_BACKEND', 'value': 'FLASH_ATTN'},
52+
{'name': 'VLLM_USE_V1', 'value': '0'},
53+
{'name': 'VLLM_LOGGING_CONFIG_PATH', 'value': os.path.join(os.path.dirname(__file__), 'logging-config.json')},
54+
],
55+
labels={'owner': 'aarnphm', 'type': 'engine'},
56+
image=IMAGE,
57+
)
58+
class Engine:
59+
model_id = MODEL_ID
60+
model = bentoml.models.HuggingFaceModel(model_id, exclude=['*.pth', '*.pt', 'original/**/*'])
5361

62+
def __init__(self):
63+
self.exit_stack = contextlib.AsyncExitStack()
5464

55-
class Suggestion(pydantic.BaseModel):
56-
suggestion: str
65+
@bentoml.on_startup
66+
async def init_engine(self) -> None:
67+
import vllm.entrypoints.openai.api_server as vllm_api_server
5768

69+
from vllm.utils import FlexibleArgumentParser
70+
from vllm.entrypoints.openai.cli_args import make_arg_parser
71+
72+
args = make_arg_parser(FlexibleArgumentParser()).parse_args([])
73+
args.model = self.model
74+
args.disable_log_requests = True
75+
args.max_log_len = 1000
76+
args.served_model_name = [self.model_id]
77+
args.request_logger = None
78+
args.disable_log_stats = True
79+
args.use_tqdm_on_load = False
80+
args.max_model_len = MAX_MODEL_LEN
81+
args.enable_prefix_caching = True
82+
args.enable_reasoning = True
83+
args.reasoning_parser = 'deepseek_r1'
84+
args.enable_auto_tool_choice = True
85+
args.tool_call_parser = 'hermes'
86+
args.guided_decoding_backend = STRUCTURED_OUTPUT_BACKEND
87+
88+
router = fastapi.APIRouter(lifespan=vllm_api_server.lifespan)
89+
OPENAI_ENDPOINTS = [
90+
['/chat/completions', vllm_api_server.create_chat_completion, ['POST']],
91+
['/models', vllm_api_server.show_available_models, ['GET']],
92+
["/embeddings", vllm_api_server.create_embedding, ["POST"]],
93+
]
5894

59-
class ServerArgs(pydantic.BaseModel):
60-
model: str
61-
disable_log_requests: bool = True
62-
disable_log_stats: bool = True
63-
max_log_len: int = 1000
64-
response_role: str = 'assistant'
65-
served_model_name: Optional[List[str]] = None
66-
chat_template: Optional[str] = None
67-
chat_template_content_format: Literal['auto'] = 'auto'
68-
lora_modules: Optional[List[str]] = None
69-
prompt_adapters: Optional[List[str]] = None
70-
request_logger: Optional[str] = None
71-
return_tokens_as_token_ids: bool = False
72-
enable_tool_call_parser: bool = True
73-
enable_auto_tool_choice: bool = True
74-
enable_prompt_tokens_details: bool = False
75-
enable_reasoning: bool = False
76-
tool_call_parser: str = 'llama3_json'
77-
guided_decoding_backend: Literal['xgrammar', 'outlines'] = 'xgrammar'
78-
reasoning_parser: str = 'deepseek_r1'
79-
task: str = 'generate'
95+
for route, endpoint, methods in OPENAI_ENDPOINTS: router.add_api_route(path=route, endpoint=endpoint, methods=methods, include_in_schema=True)
96+
openai_api_app.include_router(router)
8097

98+
self.engine = await self.exit_stack.enter_async_context(vllm_api_server.build_async_engine_client(args))
99+
self.model_config = await self.engine.get_model_config()
100+
self.tokenizer = await self.engine.get_tokenizer()
101+
102+
await vllm_api_server.init_app_state(self.engine, self.model_config, openai_api_app.state, args)
103+
104+
@bentoml.on_shutdown
105+
async def teardown_engine(self): await self.exit_stack.aclose()
81106

82-
@bentoml.asgi_app(openai_api_app, path='/v1')
83107
@bentoml.service(
84-
name='asteraceae-inference-service',
108+
name='asteraceae-inference-api',
85109
traffic={'timeout': 300, 'concurrency': 128},
86-
resources={'gpu': 1, 'gpu_type': 'nvidia-a100-80gb'},
87110
http={
88111
'cors': {
89112
'enabled': True,
@@ -95,72 +118,53 @@ class ServerArgs(pydantic.BaseModel):
95118
'access_control_expose_headers': ['Content-Length'],
96119
}
97120
},
98-
envs=[{'name': 'HF_TOKEN'}, {'name': 'UV_COMPILE_BYTECODE', 'value': "1"}],
99-
image=bentoml.images.PythonImage(python_version='3.11').requirements_file('requirements.txt'),
121+
tracing={"sample_rate": 0.4},
122+
labels={'owner': 'aarnphm', 'type': 'api'},
123+
image=IMAGE,
100124
)
101-
class Engine:
102-
ref = bentoml.models.HuggingFaceModel(MODEL_ID, exclude=['*.pth'])
125+
class API:
126+
engine = bentoml.depends(Engine)
103127

104128
def __init__(self):
105-
from vllm import AsyncEngineArgs, AsyncLLMEngine
106-
import vllm.entrypoints.openai.api_server as vllm_api_server
107-
108-
ENGINE_ARGS = AsyncEngineArgs(model=self.ref, enable_prefix_caching=True, enable_chunked_prefill=True)
109-
self.engine = AsyncLLMEngine.from_engine_args(ENGINE_ARGS)
110-
111-
OPENAI_ENDPOINTS = [
112-
['/chat/completions', vllm_api_server.create_chat_completion, ['POST']],
113-
['/completions', vllm_api_server.create_completion, ['POST']],
114-
['/embeddings', vllm_api_server.create_embedding, ['POST']],
115-
['/models', vllm_api_server.show_available_models, ['GET']],
116-
]
117-
for route, endpoint, methods in OPENAI_ENDPOINTS:
118-
openai_api_app.add_api_route(path=route, endpoint=endpoint, methods=methods, include_in_schema=True)
129+
from openai import AsyncOpenAI
119130

120-
model_config = self.engine.engine.get_model_config()
121-
# NOTE: This is ok, given that all bentoml service is running within a event loop.
122-
asyncio.create_task( # noqa: RUF006
123-
vllm_api_server.init_app_state(self.engine, model_config, openai_api_app.state, ServerArgs(model=MODEL_ID))
124-
)
131+
self.client = AsyncOpenAI(base_url=f'{self.engine.client_url}/v1', api_key='dummy')
125132

126133
@bentoml.api
127134
async def suggests(
128135
self,
129136
essay: str,
130-
temperature: Annotated[float, Ge(0.5), Le(0.7)] = 0.6,
131-
max_tokens: Annotated[int, Ge(128), Le(MAX_TOKENS)] = MAX_TOKENS,
132-
num_suggestions: Annotated[int, Ge(1), Le(10)] = 5,
133-
min_suggestions: Annotated[int, Ge(1), Le(10)] = 3,
134-
) -> AsyncGenerator[str, None]:
135-
if min_suggestions >= num_suggestions:
136-
raise ValueError(f'min_suggestions ({min_suggestions}) must be less than num_suggestions ({num_suggestions})')
137-
138-
from openai import AsyncOpenAI
139-
from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam
140-
141-
client = AsyncOpenAI(base_url='http://127.0.0.1:3000/v1', api_key='dummy')
142-
143-
Output = pydantic.create_model(
144-
'Output',
145-
__module__=Suggestion.__module__,
146-
__base__=pydantic.BaseModel,
147-
suggestions=(pydantic.conlist(Suggestion, min_length=min_suggestions, max_length=num_suggestions), ...),
148-
)
149-
params = dict(guided_json=Output.model_json_schema())
137+
num_suggestions: te.Annotated[int, ae.Ge(2)] = 3,
138+
temperature: te.Annotated[float, ae.Ge(0.5), ae.Le(0.7)] = 0.6,
139+
max_tokens: te.Annotated[int, ae.Ge(256), ae.Le(MAX_TOKENS)] = MAX_TOKENS,
140+
) -> t.AsyncGenerator[str, None]:
141+
142+
messages = [
143+
{'role': 'system', 'content': SYSTEM_PROMPT.format(num_suggestions=num_suggestions)},
144+
{'role': 'user', 'content': essay},
145+
]
150146

147+
prefill = False
151148
try:
152-
completions = await client.chat.completions.create(
153-
model=MODEL_ID,
149+
completions = await self.client.chat.completions.create(
150+
model=Engine.inner.model_id,
154151
temperature=temperature,
155152
max_tokens=max_tokens,
156-
messages=[
157-
ChatCompletionSystemMessageParam(role='system', content=SYSTEM_PROMPT),
158-
ChatCompletionUserMessageParam(role='user', content=essay),
159-
],
153+
messages=messages,
160154
stream=True,
161-
extra_body=params,
155+
extra_body=dict(guided_json=Suggestions.model_json_schema()),
162156
)
163157
async for chunk in completions:
164-
yield chunk.choices[0].delta.content or ''
158+
delta_choice = chunk.choices[0].delta
159+
if hasattr(delta_choice, "reasoning_content"): s = Suggestion(suggestion=delta_choice.content or '', reasoning=delta_choice.reasoning_content)
160+
else: s = Suggestion(suggestion=delta_choice.content)
161+
if not prefill:
162+
prefill = True
163+
yield ''
164+
else:
165+
if not s.reasoning and not s.suggestion: break
166+
yield f'{s.model_dump_json()}\n'
165167
except Exception:
166-
yield traceback.format_exc()
168+
logger.error(traceback.format_exc())
169+
yield f'{Suggestion(suggestion="Internal error found. Check server logs for more information").model_dump_json()}\n'
170+
return

0 commit comments

Comments
 (0)