Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

gateway设计与开发

项目背景

微服务API网关可反向代理内部的服务,保证内部服务的不可见。作为系统的统一入口,再此基础之上可以实现负载均衡等、限流、路由和限流等功能。

在大模型应用背景下,通过K8s pod 运行VLLM等框架部署大模型,使用OpenAI的标准接口调用。统一个模型可以启多个模型应用,同时为了支持不同的模型请求,屏蔽底层调用的差异,实现请求调用的监控统计。为了实现以上的功能,开发网关项目。

其主要功能有:

  1. apikey校验
  2. 请求转发
  3. token统计
  4. 熔断限流

项目实现

使用fiber框架实现网关,而fiber基于fasthttp实现,fasthttp引入协程池,对比net/http具有更高的性能。但截至当前暂不支持http2,需要配合其他包实现,在不使用grpc前提下,还是具有可靠的选择性。

核心结构

网关架构.drawio

执行步骤:

  1. 用户使用标准OpenAI接口调用模型,向网关发送请求
  2. 网关获取请求之后,校验apikey并根据请求参数找到真实的模型地址
  3. 网关创建一个客户端,使用客户端请求模型地址
  4. 客户端得到响应之后,根据需求分析响应的内容(计算token)
  5. 客户端将响应赋值给网关的响应
  6. 网关返回响应给用户

客户端结构

gateway流程图.drawio

上图是项目gateway中client的实现,具体实现可以参考在项目中的内容。

重点分析:大模型的调用一般返回的都是流式响应,客户端获取的响应和网关返回给用户的响应都是流式数据,在fasthttp的实现中,响应的读取使用io read实现,读取客户端的响应就会将响应读取完,不能赋值给网关的响应。具体的请求转发逻辑的实现,在那个过程实现。可以封装fasthttp client在其之上实现,也可以直接在网关层实现。

问题解决

流式返回的内容在网关读取的同时,也需要返回给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (c *client) Do(ctx *fiber.Ctx) error {
//to do
//选择器
n := c.applier.nodes[0]
apikey := ctx.Get("Authorization")[7:]
modelName := ctx.Locals("modelName").(string)
reqBody := tools.Analysis(ctx)

clientReq := fasthttp.AcquireRequest()
clientRes := fasthttp.AcquireResponse()
resp := ctx.Response()
req := ctx.Request()
resp.StreamBody = true
clientRes.StreamBody = true
clientReq = ctx.Request()
originalURL := utils.CopyString(string(req.RequestURI()))
defer clientReq.SetRequestURI(originalURL)
copiedURL := utils.CopyString(n.address)
clientReq.SetRequestURI(copiedURL)
if scheme := getScheme(utils.UnsafeBytes(copiedURL)); len(scheme) > 0 {
clientReq.URI().SetSchemeBytes(scheme)
}
err := n.client.Do(clientReq, clientRes)
if err != nil {
GatewayLog.Error().Str("ERROR_CODE", e.GetMsg(e.ERROR)).Msg(err.Error())
return err
}
bodyStream := clientRes.BodyStream()
if bodyStream == nil {
return errors.New("response body stream is nil")
}
pr, pw := io.Pipe()
startTime := time.Now()
EndMessage := ""
CompletionTokens := 0

go func() {
defer func() {
err := pw.Close()
if err != nil {
GatewayLog.Error().Str("ERROR_CODE", e.GetMsg(e.ERROR)).Msg(err.Error())
}
err = clientRes.CloseBodyStream()
if err != nil {
GatewayLog.Error().Str("ERROR_CODE", e.GetMsg(e.ERROR)).Msg(err.Error())
}
}()
buf := make([]byte, 1024)
for {
n, readErr := bodyStream.Read(buf)
if readErr != nil && readErr != io.EOF {
GatewayLog.Error().Str("ERROR_CODE", e.GetMsg(e.ERROR)).Msg(readErr.Error())
return
}
_, writeErr := pw.Write(buf[:n])
if writeErr != nil {
GatewayLog.Error().Str("ERROR_CODE", e.GetMsg(e.ERROR)).Msg(writeErr.Error())
return
}
//fmt.Printf("%s", string(buf[:n]))
CompletionTokens += 1
if readErr == io.EOF {
EndMessage = string(buf[:n])
break
}
}
endTime := time.Now()

go Handler(apikey,
modelName,
endTime.Sub(startTime),
EndMessage,
CompletionTokens,
reqBody,
clientRes.StatusCode(),
)

}()
ctx.Set("Content-Type", "text/event-stream")
resp.SetBodyStream(pr, -1)
return nil
}

在代码中使用pr, pw := io.Pipe()管道从clientRes.BodyStream中读取流式数据并做相关的处理,同时将读到的内容写到 pw.Write(buf[:n]),网关从管道中读到网关的响应resp.SetBodyStream(pr, -1)

评论