-
Notifications
You must be signed in to change notification settings - Fork 509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: process stream events #1184
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1184 +/- ##
==========================================
+ Coverage 35.91% 35.96% +0.05%
==========================================
Files 69 69
Lines 11576 9500 -2076
==========================================
- Hits 4157 3417 -740
+ Misses 7104 5767 -1337
- Partials 315 316 +1 |
type chatCompletionResponseConverter interface{} | ||
|
||
// processStreamEvent 从上下文中取出缓冲区,将新 chunk 追加到缓冲区,然后处理缓冲区中的完整事件 | ||
func processStreamEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我倾向于在 main.go 里封装好相关逻辑,组装成一个或多个完整的SSE message之后,再交给 provider 来处理,这样 provider 的代码可以不做改动。每次传入的 chunk 参数可以确保是完整的 SSE Message。
另外processStreamEvent的本质应该是多次迭代处理不完整的消息,进行合并处理,每次返回完整的 SSE 消息,这样的逻辑应该是通用的,不仅局限于AI Proxy插件使用,这个工具函数可以提出来放到上层。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if isLastChunk || len(chunk) == 0 { | ||
return nil | ||
} | ||
// 从上下文中取出缓冲区,将新 chunk 追加到缓冲区 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的SSE处理逻辑感觉比较复杂,可否参考下这里:
higress/plugins/wasm-go/extensions/ai-cache/main.go
Lines 237 to 270 in 04a9104
func processSSEMessage(ctx wrapper.HttpContext, config PluginConfig, sseMessage string, log wrapper.Log) string { | |
subMessages := strings.Split(sseMessage, "\n") | |
var message string | |
for _, msg := range subMessages { | |
if strings.HasPrefix(msg, "data:") { | |
message = msg | |
break | |
} | |
} | |
if len(message) < 6 { | |
log.Errorf("invalid message:%s", message) | |
return "" | |
} | |
// skip the prefix "data:" | |
bodyJson := message[5:] | |
if gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Exists() { | |
tempContentI := ctx.GetContext(CacheContentContextKey) | |
if tempContentI == nil { | |
content := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw) | |
ctx.SetContext(CacheContentContextKey, content) | |
return content | |
} | |
append := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw) | |
content := tempContentI.(string) + append | |
ctx.SetContext(CacheContentContextKey, content) | |
return content | |
} else if gjson.Get(bodyJson, "choices.0.delta.content.tool_calls").Exists() { | |
// TODO: compatible with other providers | |
ctx.SetContext(ToolCallsContextKey, struct{}{}) | |
return "" | |
} | |
log.Debugf("unknown message:%s", bodyJson) | |
return "" | |
} |
仅做参考,逻辑本身是识别openai协议用的,也不通用
Ⅰ. Describe what this PR did
将处理 SSE 消息的逻辑抽离出来,开发 provider 时只需要处理消息本身的内容即可。
Ⅱ. Does this pull request fix one issue?
fixes #1164
Ⅲ. Why don't you add test cases (unit test/integration test)?
Ⅳ. Describe how to verify it
qwen
claude
baidu
hunyuan
minimax
Ⅴ. Special notes for reviews