Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

GRPC流的使用

流传输方式简介

客户端流

客户端发送多次,服务端响应一次(关闭时响应)。

服务端流

客户端先发送一次,服务端响应多次。

双向流

客户端发送,服务端响应。

客户端流

服务定义

1
rpc ClientStreamPing(stream PingRequest) returns (PingReply);

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
stream, err := client.SayStream(context.Background())
err = stream.Send(&pd.HelloReq{
Name: "111",
Age: 3,
})
if err != nil {
fmt.Printf(err.Error())
}
stream.Send(&pd.HelloReq{
Name: "111",
Age: 4,
})
stream.Send(&pd.HelloReq{
Name: "111",
Age: 5,
})
recv, err := stream.CloseAndRecv()
if err != nil {
return
}
fmt.Printf("接受:%s", recv.Say)
  1. 创建客户端之后,调用服务生成流
  2. 使用流发送消息
  3. 关闭流并接受服务端的响应

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *Server) SayStream(stream pd.HelloServer_SayStreamServer) error {
for {
//源源不断的去接收客户端发来的信息
recv, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return err
}

fmt.Println("服务端接收到的流", recv.Name, recv.Age)
}
return stream.SendAndClose(&pd.HelloRep{Say: "结束"})
}
  1. 实现流的接口
  2. 接受客户端发送的消息
  3. 正常关闭时发送响应消息

服务端流

服务定义

1
rpc ServerStreamPing(PingRequest) returns (stream PingReply);

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
stream, err := client.SayStreamServer(context.Background(), &pd.HelloReq{
Name: "111",
Age: 0,
})
for {
recv, err := stream.Recv()
if err != nil {
if err == io.EOF {
fmt.Println("客户端数据接收完成")
err := stream.CloseSend()
if err != nil {
log.Fatal(err)
}
break
}
log.Fatal(err)
}
fmt.Println("客户端收到的流", recv.Say)
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *Server) SayStreamServer(req *pd.HelloReq, stream pd.HelloServer_SayStreamServerServer) error {
count := 0
fmt.Printf(req.Name, req.Age)
for {
rsp := &pd.HelloRep{Say: "server send"}
err := stream.Send(rsp)
if err != nil {
return err
}
time.Sleep(time.Second)
count++
if count > 10 {
return nil
}
}
}

双向流

服务定义

1
rpc SayDoubleStream(stream helloReq) returns (stream helloRep) {};

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
stream, err := client.SayDoubleStream(context.Background())
var count int32 = 22
for {
request := &pd.HelloReq{
Name: "zzz",
Age: count,
}
err = stream.Send(request)
count++
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
recv, err := stream.Recv()
if err != nil {
log.Fatal(err)
}
//websocket
fmt.Println("客户端收到的流信息", recv.Say)
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *Server) SayDoubleStream(stream pd.HelloServer_SayDoubleStreamServer) error {
for {
recv, err := stream.Recv()
if err != nil {
return nil
}
fmt.Println("服务端收到客户端的消息", recv.Name, recv.Age)
time.Sleep(time.Second)
rsp := &pd.HelloRep{Say: fmt.Sprintf("%s的年龄:%d", recv.Name, recv.Age)}
err = stream.Send(rsp)
if err != nil {
return nil
}
}
}

评论