diff --git a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py index 709ba8de6c4..4c16c12f1c2 100644 --- a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py +++ b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py @@ -361,6 +361,33 @@ async def show_version(): return JSONResponse(content=ver) +save_dict = {} +import os +flag = os.getenv("VLLM_LOG_OUTPUT", None) +async def stream_generator(generator, request, request_id): + async for chunk in generator: + if request_id not in save_dict: + save_dict[request_id] = "" + import json + try: + data = chunk.strip() + if data.startswith('data: '): + data = data[len('data: '):] + else: + yield chunk + json_data = json.loads(data) + if 'choices' in json_data and len(json_data['choices']) > 0: + choice = json_data['choices'][0] + if 'delta' in choice: + save_dict[request_id] += choice["delta"]["content"] + elif 'text' in choice: + save_dict[request_id] += choice["text"] + except json.JSONDecodeError: + print(f"Received request_id: {request_id}, request: {request} content: {save_dict[request_id]}") + pass # Done + yield chunk + + @router.post("/v1/chat/completions") @with_cancellation async def create_chat_completion(request: ChatCompletionRequest, @@ -370,6 +397,11 @@ async def create_chat_completion(request: ChatCompletionRequest, return base(raw_request).create_error_response( message="The model does not support Chat Completions API") + if flag is not None: + request_id = "chatcmpl-" \ + f"{handler._base_request_id(raw_request, request.request_id)}" + print(f"First received request_id: {request_id}, request: {request}") + generator = await handler.create_chat_completion(request, raw_request) if isinstance(generator, ErrorResponse): @@ -377,8 +409,12 @@ async def create_chat_completion(request: ChatCompletionRequest, status_code=generator.code) elif isinstance(generator, ChatCompletionResponse): + if flag is not None: + print(f"Received request-id:{request_id}, request:{request}, Output:{generator.model_dump()}") return JSONResponse(content=generator.model_dump()) + if flag is not None: + return StreamingResponse(content=stream_generator(generator, request, request_id), media_type="text/event-stream") return StreamingResponse(content=generator, media_type="text/event-stream") @@ -390,13 +426,21 @@ async def create_completion(request: CompletionRequest, raw_request: Request): return base(raw_request).create_error_response( message="The model does not support Completions API") + if flag is not None: + request_id = f"cmpl-{handler._base_request_id(raw_request)}" + print(f"First received request_id: {request_id}, request: {request}") + generator = await handler.create_completion(request, raw_request) if isinstance(generator, ErrorResponse): return JSONResponse(content=generator.model_dump(), status_code=generator.code) elif isinstance(generator, CompletionResponse): + if flag is not None: + print(f"Received request-id:{request_id}, request:{request}, Output:{generator.model_dump()}") return JSONResponse(content=generator.model_dump()) - + + if flag is not None: + return StreamingResponse(content=stream_generator(generator, request, request_id), media_type="text/event-stream") return StreamingResponse(content=generator, media_type="text/event-stream")