Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Fiber中的流式响应

Fiber(基于 fasthttp)中实现流式响应,需要使用 底层的流式写入接口,使得服务器可以边生成边发送数据,而不是一次性发送完整响应

应用场景示例

  • OpenAI 的流式 Chat Completion 接口;
  • Server-Sent Events(SSE);
  • 实时日志输出;
  • 音视频分片推送;
  • 大文件边处理边发送;

使用SetBodyStreamWriter实现

1
2
3
4
5
6
7
8
9
10
11
12
app.Get("/stream", func(ctx *fiber.Ctx) error {
ctx.Set("Content-Type", "text/event-stream") // 或 application/octet-stream
ctx.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
for i := 0; i < 5; i++ {
// SSE格式:data: ...\n\n
fmt.Fprintf(w, "data: message %d\n\n", i)
w.Flush() // ⚠️ 必须刷新,否则客户端收不到
time.Sleep(1 * time.Second)
}
})
return nil
})

使用SetBodyStream+io.Pipe实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
app.Get("/stream", func(ctx *fiber.Ctx) error {
pr, pw := io.Pipe() // 管道:一个 reader,一个 writer

// 设置响应头:使用 chunked 传输
ctx.Set("Content-Type", "text/event-stream")
ctx.Response().SetBodyStream(pr, -1) // -1 表示 Transfer-Encoding: chunked

// 在另一个 goroutine 中写入内容(模拟流式数据)
go func() {
for i := 0; i < 5; i++ {
fmt.Fprintf(pw, "data: message %d\n\n", i)
time.Sleep(1 * time.Second)
}
pw.Close() // 结束写入(很重要)
}()

return nil
})

[!CAUTION]

为什么不能用 bytes.NewReader 来流式返回?

1
2
reader := bytes.NewReader([]byte("hello\nworld"))
ctx.Response().SetBodyStream(reader, -1)

bytes.NewReader 是完全内存里的数据,fasthttp 调用 io.Copy() 时一下子就读取完了。

二者的关键区别在于 reader.Read() 的行为:是否懒读取、是否阻塞。

  • bytes.NewReader 会立刻把所有数据都“读完”,fasthttp 会缓存后统一发送。
  • io.Pipe()PipeReader.Read()阻塞直到写入数据,fasthttp 能边读边发。

关键对比:reader.Read() 行为差异

bytes.NewReader([]byte(...))
  • 是内存 reader;
  • Read() 调用时,数据已经全部存在;
  • 所以 fasthttp 通过 io.Copy() 立刻读完所有内容
  • fasthttp 现在掌握了完整内容,就缓存起来,一次性写入 socket ➜ 非流式
  • 虽然你设了 chunked,但并没有任何“流”或“等待”发生。
io.Pipe()
  • PipeReader 是一个真正的管道;
  • pr.Read() 如果没有人 pw.Write(),就会阻塞在那里;
  • fasthttp 调用 io.Copy() 时:
    • 读一点(当有数据);
    • 立即通过 chunked 写一点;
    • 再继续等写入(写入线程继续 pw.Write());
  • 于是形成了:边写边读边发 ➜ 真正的流式传输。

评论