问题提出
最近在实现一个转发大模型调用请求的中转功能,因为现在的大模型都支持流式与非流式调用,所以我在实现的时候自然而然的想到了要不要区分两种调用方式,使用不同的代码逻辑。一开始写的时候分开了两个调用方式来写,后来发现这两种方式对于服务端来说实际没有区别,重点在于客户端的使用。
原理探寻
首先看一下两种调用方式实际返回的数据。
==== 非流式调用返回 ====
{
"choices": [
{
"finish_reason": "stop",
"index": 0,
"logprobs": null,
"message": {
"content": "Hello! How can I help you today?",
"reasoning_content": "用户现在打招呼说Hello!,我要友好回应。所以回复“Hello! How can I help you today?” 这样友好又开放,邀请用户说明需求。",
"role": "assistant"
}
}
],
"created": 1755675226,
"id": "021755675225006530d45f3e18d6c5c13bff8621e54f4f6ee920a",
"model": "doubao-1-5-thinking-vision-pro-250428",
"service_tier": "default",
"object": "chat.completion",
"usage": {
"completion_tokens": 45,
"prompt_tokens": 70,
"total_tokens": 115,
"prompt_tokens_details": {
"cached_tokens": 0
},
"completion_tokens_details": {
"reasoning_tokens": 36
}
}
}
==== 流式调用返回 ====
data: {"choices":[{"delta":{"content":"","reasoning_content":"用户","role":"assistant"},"index":0}],"created":1755675226,"id":"021755675226374530d45f3e18d6c5c13bff8621e54f4f6ec4025","model":"doubao-1-5-thinking-vision-pro-250428","service_tier":"default","object":"chat.completion.chunk","usage":null}
data: {"choices":[{"delta":{"content":"","reasoning_content":"现在","role":"assistant"},"index":0}],"created":1755675226,"id":"021755675226374530d45f3e18d6c5c13bff8621e54f4f6ec4025","model":"doubao-1-5-thinking-vision-pro-250428","service_tier":"default","object":"chat.completion.chunk","usage":null}
...........
data: {"choices":[{"delta":{"content":"","role":"assistant"},"finish_reason":"stop","index":0}],"created":1755675226,"id":"021755675226374530d45f3e18d6c5c13bff8621e54f4f6ec4025","model":"doubao-1-5-thinking-vision-pro-250428","service_tier":"default","object":"chat.completion.chunk","usage":null}
data: [DONE]
可以看出,两种请求返回的格式是不一样的,非流式请求返回的是完整的json,而流式请求则是返回分块的json。
服务端
在服务端的实现中,实际上我们可以不用在意是流式还是非流式,我们可以统一采用流的方法返回请求体,是否真是流式形式实际上是取决于约定的返回格式以及客户端处理方式。
// 设置响应头
for k, v := range resp.Header {
for _, vv := range v {
c.Header(k, vv)
}
}
// 设置状态码
c.Status(resp.StatusCode)
// 流式转发响应体
_, err = io.Copy(c.Response.BodyWriter(), resp.Body)
if err != nil {
// 处理错误
hlog.CtxErrorf(ctx, "Error copying response body: %v", err)
c.String(http.StatusInternalServerError, "Error copying response body: %v", err)
return
}
当然,由于我们此处是实现转发的功能,因此可以采用统一的方式。在原始的返回代码中,实际上会有些不同。
//----------非流式输出:传统请求处理----------//
app.get('/api/user-data', (req, res) => {
// 同步获取所有数据(可能造成阻塞)
const userData = fetchUserDataFromDB();
const analytics = generateUserAnalytics(userData);
const preferences = getUserPreferences(userData.id);
// 一次性返回完整JSON
res.json({
userData,
analytics,
preferences
});
});
//----------流式输出:实时数据分块传输----------//
app.get('/api/real-time-updates', (req, res) => {
// 1. 设置流式响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 2. 立即发送初始数据
res.write('event: init\ndata: Connection established\n\n');
// 3. 创建数据流
const dataStream = createRealTimeDataStream();
// 4. 分块发送数据
dataStream.on('data', (chunk) => {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
});
// 5. 处理连接关闭
req.on('close', () => {
dataStream.destroy();
res.end();
});
});
客户端
客户端的处理方式就是有明显分别了。
//----------非流式输出:传统请求处理(传统Fetch API使用)----------//
async function loadUserData() {
try {
const response = await fetch('/api/user-data');
const fullData = await response.json();
// 一次性渲染所有数据
renderUserProfile(fullData.userData);
renderAnalytics(fullData.analytics);
updatePreferencesUI(fullData.preferences);
} catch (error) {
showErrorToast('Failed to load data');
}
}
//----------流式输出:实时数据处理技术(使用EventSource)----------//
const eventSource = new EventSource('/api/real-time-updates');
eventSource.addEventListener('init', (event) => {
showConnectionStatus('Connected to real-time feed');
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
updateLiveDashboard(data);
};
eventSource.onerror = () => {
showConnectionStatus('Connection lost - attempting to reconnect...');
eventSource.close();
};