双向流式RPC即客户端和服务端均为流式的RPC,能发送多个请求对象也能接收到多个响应对象。典型应用示例:聊天应用等。
我们这里还是编写一个客户端和服务端进行人机对话的双向流式RPC示例。
1.定义服务
// 双向流式数据
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
修改.proto文件后,需要重新使用 protocol buffers编译器生成客户端和服务端代码。
2.服务端实现BidiHello方法。
func (s * server) BidHello(stream pb.Greeter_BidiHelloServer)error{
for {
//接收流式请求
in, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}reply := magic(in.GetName()) // 对收到的数据做些处理// 返回流式响应if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {return err}
}
}
这里我们还定义了一个处理数据的magic函数,其内容如下。
// magic 一段价值连城的“人工智能”代码
func magic(s string) string {s = strings.ReplaceAll(s, "吗", "")s = strings.ReplaceAll(s, "吧", "")s = strings.ReplaceAll(s, "你", "我")s = strings.ReplaceAll(s, "?", "!")s = strings.ReplaceAll(s, "?", "!")return s
}
3.客户端调用BidiHello方法,一边从终端获取输入的请求数据发送至服务端,一边从服务端接收流式响应。
func runBidHello(c pb.GreeterClient){
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)defer cancel()// 双向流模式stream, err := c.BidiHello(ctx)if err != nil {log.Fatalf("c.BidiHello failed, err: %v", err)}waitc := make(chan struct{})go func() {for {// 接收服务端返回的响应in, err := stream.Recv()if err == io.EOF {// read done.close(waitc)return}if err != nil {log.Fatalf("c.BidiHello stream.Recv() failed, err: %v", err)}fmt.Printf("AI:%s\n", in.GetReply())}}()// 从标准输入获取用户输入reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象for {cmd, _ := reader.ReadString('\n') // 读到换行cmd = strings.TrimSpace(cmd)if len(cmd) == 0 {continue}if strings.ToUpper(cmd) == "QUIT" {break}// 将获取到的数据发送至服务端if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {log.Fatalf("c.BidiHello stream.Send(%v) failed: %v", cmd, err)}}stream.CloseSend()<-waitc
}
流式RPC调用metadata示例
这里以双向流式RPC为例演示客户端和服务端如何进行metadata操作。
client端的metadata操作
下面的代码片段演示了client端在服务端流式RPC模式下如何设置和获取metadata。
// bidirectionalWithMetadata 流式RPC调用客户端metadata操作
func bidirectionalWithMetadata(c pb.GreeterClient, name string) {// 创建metadata和context.md := metadata.Pairs("token", "app-test-q1mi")ctx := metadata.NewOutgoingContext(context.Background(), md)// 使用带有metadata的context执行RPC调用.stream, err := c.BidiHello(ctx)if err != nil {log.Fatalf("failed to call BidiHello: %v\n", err)}go func() {// 当header到达时读取header.header, err := stream.Header()if err != nil {log.Fatalf("failed to get header from stream: %v", err)}// 从返回响应的header中读取数据.if l, ok := header["location"]; ok {fmt.Printf("location from header:\n")for i, e := range l {fmt.Printf(" %d. %s\n", i, e)}} else {log.Println("location expected but doesn't exist in header")return}// 发送所有的请求数据到server.for i := 0; i < 5; i++ {if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil {log.Fatalf("failed to send streaming: %v\n", err)}}stream.CloseSend()}()
// 读取所有的响应.var rpcStatus errorfmt.Printf("got response:\n")for {r, err := stream.Recv()if err != nil {rpcStatus = errbreak}fmt.Printf(" - %s\n", r.Reply)}if rpcStatus != io.EOF {log.Printf("failed to finish server streaming: %v", rpcStatus)return}// 当RPC结束时读取trailertrailer := stream.Trailer()// 从返回响应的trailer中读取metadata.if t, ok := trailer["timestamp"]; ok {fmt.Printf("timestamp from trailer:\n")for i, e := range t {fmt.Printf(" %d. %s\n", i, e)}} else {log.Printf("timestamp expected but doesn't exist in trailer")}
}
server端的metadata操作
下面的代码片段演示了server端在服务端流式RPC模式下设置和操作metadata。
// BidirectionalStreamingSayHello 流式RPC调用客户端metadata操作
func (s *server) BidirectionalStreamingSayHello(stream pb.Greeter_BidiHelloServer) error {// 在defer中创建trailer记录函数的返回时间.defer func() {trailer := metadata.Pairs("timestamp", strconv.Itoa(int(time.Now().Unix())))stream.SetTrailer(trailer)}()// 从client读取metadata.md, ok := metadata.FromIncomingContext(stream.Context())if !ok {return status.Errorf(codes.DataLoss, "BidirectionalStreamingSayHello: failed to get metadata")}if t, ok := md["token"]; ok {fmt.Printf("token from metadata:\n")for i, e := range t {fmt.Printf(" %d. %s\n", i, e)}}// 创建和发送header.header := metadata.New(map[string]string{"location": "X2Q"})stream.SendHeader(header)// 读取请求数据发送响应数据.for {in, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}fmt.Printf("request received %v, sending reply\n", in)if err := stream.Send(&pb.HelloResponse{Reply: in.Name}); err != nil {return err}}
}