Bring in grpc: grpcstreaming, clientand server

  golang, grpc, php, protobuf

Bring in grpc: grpcstreaming, clientand server

Original address:Bring in grpc: grpcstreaming, clientand server

Project address:go-grpc-example

Preface

This section will introduce gRPC streaming, divided into three types:

  • Server-sidestreaming rpcs: Server-side streaming RPC
  • Client-sidestreaming rpcs: client streaming rpcs
  • Bidirectional streaming RPC: bidirectional streaming RPC

Flow

Any technology is necessary because of its pain. If you want to know about streaming calls from gRPC, please continue

Figure

image

GRPC Streaming is based on HTTP/2 and will be explained in detail in subsequent chapters.

Why not use simplrpc?

Why does streaming exist? is there anything wrong with simpleprpc? By simulating the business scenario, we can know that there are the following problems when using Simple RPC:

  • Instantaneous Pressure Caused by Excessive Data Packets
  • When receiving data packets, it is required that all data packets are successfully and correctly received before callback response and business processing can be carried out (client cannot send while server processes)

Why Streaming RPC

  • Large scale data packet
  • Real-time scene

Simulation scene

Every morning at 6 o’clock, a batch of million-level data sets will be synchronized from A to B. During synchronization, a series of operations (archiving, data analysis, portrait, log, etc.) will be performed. The amount of data involved in this one-off is really large.

After the synchronization is completed, some people will look up the data immediately to prepare for the new day. But also accord with real-time performance.

Compared with the two, Streaming RPC is more suitable for this scenario.

gRPC

When explaining the specific gRPC streaming code, it willFocus on the first sectionBecause the three models are actually different combinations. I hope you can pay attention to understanding and draw inferences from other examples. In fact, they are all the same knowledge points.

directory structure

$ tree go-grpc-example 
go-grpc-example
├── client
│   ├── simple_client
│   │   └── client.go
│   └── stream_client
│       └── client.go
├── proto
│   ├── search.proto
│   └── stream.proto
└── server
    ├── simple_server
    │   └── server.go
    └── stream_server
        └── server.go

Add stream_server and stream_client to store server and client files, proto/stream.proto is used to write IDL

IDL

In the stream.proto file under the proto folder, write the following:

syntax = "proto3";

package proto;

service StreamService {
    rpc List(StreamRequest) returns (stream StreamResponse) {};

    rpc Record(stream StreamRequest) returns (StreamResponse) {};

    rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}


message StreamPoint {
  string name = 1;
  int32 value = 2;
}

message StreamRequest {
  StreamPoint pt = 1;
}

message StreamResponse {
  StreamPoint pt = 1;
}

Note the keyword stream and declare it a stream method. There are three methods involved here, and the corresponding relation is

  • List: server-side streaming rpcs
  • Record: client streaming rpcs
  • Route: bidirectional streaming rpcs

Basic Template+Empty Definition

Server

package main

import (
    "log"
    "net"

    "google.golang.org/grpc"

    pb "github.com/EDDYCJY/go-grpc-example/proto"
    
)

type StreamService struct{}

const (
    PORT = "9002"
)

func main() {
    server := grpc.NewServer()
    pb.RegisterStreamServiceServer(server, &StreamService{})

    lis, err := net.Listen("tcp", ":"+PORT)
    if err != nil {
        log.Fatalf("net.Listen err: %v", err)
    }

    server.Serve(lis)
}

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    return nil
}

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    return nil
}

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    return nil
}

Before writing the code, it is recommended to define the basic template and interface of gRPC Server empty. If you are not clear, please refer to the knowledge points in the previous chapter.

Client

package main

import (
    "log"
    
    "google.golang.org/grpc"

    pb "github.com/EDDYCJY/go-grpc-example/proto"
)

const (
    PORT = "9002"
)

func main() {
    conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("grpc.Dial err: %v", err)
    }

    defer conn.Close()

    client := pb.NewStreamServiceClient(conn)

    err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
    if err != nil {
        log.Fatalf("printLists.err: %v", err)
    }

    err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
    if err != nil {
        log.Fatalf("printRecord.err: %v", err)
    }

    err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
    if err != nil {
        log.Fatalf("printRoute.err: %v", err)
    }
}

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    return nil
}

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    return nil
}

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    return nil
}

I. server-sidestreaming rpcs: Server-side streaming RPC

Server-side Streaming RPC is obviously a one-way stream and refers instead to server as stream and Client as normal RPC request

Simply put, the client initiates an ordinary RPC request, the server sends data sets several times through streaming response, and the client Recv receives the data sets. Roughly as shown:

image

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
    for n := 0; n <= 6; n++ {
        err := stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                Name:  r.Pt.Name,
                Value: r.Pt.Value + int32(n),
            },
        })
        if err != nil {
            return err
        }
    }

    return nil
}

On Server, pay attention tostream.SendMethods. Does it look like it can send n times? Is there a size limit?

type StreamService_ListServer interface {
    Send(*StreamResponse) error
    grpc.ServerStream
}

func (x *streamServiceListServer) Send(m *StreamResponse) error {
    return x.ServerStream.SendMsg(m)
}

By reading the source code, we can know that protoc generated various standard interface methods according to the definition when it was generated. Finally, the internal ones will be dispatched in a unified way.SendMsgThe method involves the following processes:

  • Message body (object) serialization
  • Compress the serialized message body
  • Add a header of 5 bytes to the message body being transmitted
  • Judging whether the total byte length of the compressed and serialized message body is greater than the preset maxsendmessage (the default value ismath.MaxInt32), if beyond the prompt error
  • Data set written to stream

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.List(context.Background(), r)
    if err != nil {
        return err
    }

    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
    }

    return nil
}

In Client, pay attention tostream.Recv()Methods. Under what circumstancesio.EOF? Under what circumstances is there an error message?

type StreamService_ListClient interface {
    Recv() (*StreamResponse, error)
    grpc.ClientStream
}

func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
    m := new(StreamResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

RecvMsg will read the complete gRPC message body from the stream. In addition, it can be known from reading the source code:

(1)RecvMsg is blocking waiting

(2)RecvMsg returns when the flow is successful/ended (Close is called)io.EOF

(3)RecvMsg When there is any error in the flow, the flow will be aborted and the error message will contain RPC error code. However, the following errors may occur in RecvMsg:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

Also note that the default MaxReceiveMessageSize value is 102410244, it is recommended not to exceed

Verification

Run stream_server/server.go:

$ go run server.go

Run stream_client/client.go:

$ go run client.go 
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024

II. Client-side streaming RPC: Client Streaming RPC

Client streaming RPC, one-way streaming, client initiated by streamingmany timesRPC request to server, server initiatesOnceResponse to the client, roughly as shown in figure:

image

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
    for {
        r, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
        }
        if err != nil {
            return err
        }

        log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
    }

    return nil
}

One more method I have never seen before.stream.SendAndCloseWhat is it used for?

In this procedure, we processed each Recv and found thatio.EOFAfter (stream closing), the final response result needs to be sent to the client and Recv waiting on the other side needs to be closed at the same time

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Record(context.Background())
    if err != nil {
        return err
    }

    for n := 0; n < 6; n++ {
        err := stream.Send(r)
        if err != nil {
            return err
        }
    }

    resp, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }

    log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)

    return nil
}

stream.CloseAndRecvAndstream.SendAndCloseIt is a matching flow method. I believe that you have already understood its function in seconds.

Verification

Restart stream_server/server.go and run stream_client/client.go again:

stream_client:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
stream_server:
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018

III. Bidirectional streaming RPC: Bidirectional Streaming RPC

Two-way flow RPC, as the name implies, is a two-way flow. The client initiates the request in a streaming manner, and the server also responds to the request in a streaming manner.

The first request must be initiated by the Client, but the specific interaction method (who comes first, who goes after, how much is sent at one time, how much is responded to, and when to close) is determined according to the programming method (it can be combined with coordination process)

Assuming that the bidirectional flow isSend in sequence, roughly as shown in figure:

image

Still, it should be stressed that the two-way flow varies greatly, depending on the programming.Two-way flow diagram cannot be applied to different scenes

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
    n := 0
    for {
        err := stream.Send(&pb.StreamResponse{
            Pt: &pb.StreamPoint{
                Name:  "gPRC Stream Client: Route",
                Value: int32(n),
            },
        })
        if err != nil {
            return err
        }

        r, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        n++

        log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
    }

    return nil
}

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
    stream, err := client.Route(context.Background())
    if err != nil {
        return err
    }

    for n := 0; n <= 6; n++ {
        err = stream.Send(r)
        if err != nil {
            return err
        }

        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
    }

    stream.CloseSend()

    return nil
}

Verification

Restart stream_server/server.go and run stream_client/client.go again:

stream_server
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
stream_client
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6

Summary

In this paper, three types of flow interaction modes are introduced, which can be selected according to the actual business scenarios. It will get twice the result with half the effort.

Series catalog