Part-3: Building a basic microservice with bidirectional-streaming gRPC using Golang

Objective

Introduction

  • both sides send a sequence of messages using a read-write stream
  • the two streams operate independently, so clients and servers can read and write in whatever order they like
  • for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes

Implementing Bidirectional Streaming RPC

Update the protobuf

service Machine {
- rpc Execute(stream Instruction) returns (Result) {}
- rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
+ rpc Execute(stream Instruction) returns (stream Result) {}
}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
... type MachineServer interface {
Execute(Machine_ExecuteServer) error
- ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
... type MachineClient interface {
Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
- ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
... source: machine/machine.pb.go

Update the Server

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
var stack stack.Stack
for {
instruction, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
return nil
}
if err != nil {
return err
}
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case ADD, SUB, MUL, DIV:
item2, popped := stack.Pop()
item1, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
var res float32
if op_type == ADD {
res = item1 + item2
} else if op_type == SUB {
res = item1 - item2
} else if op_type == MUL {
res = item1 * item2
} else if op_type == DIV {
res = item1 / item2
}
stack.Push(res)
if err := stream.Send(&machine.Result{Output: float32(res)}); err != nil {
return err
}
case FIB:
n, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == FIB {
for f := range utils.FibonacciRange(int(n)) {
if err := stream.Send(&machine.Result{Output: float32(f)}); err != nil {
return err
}
}
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
}
source: server/machine.go

Update the Client

func runExecute(client machine.MachineClient, instructions []*machine.Instruction) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
waitc := make(chan struct{})
go func() {
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
close(waitc)
return
}
if err != nil {
log.Printf("Err: %v", err)
}
log.Printf("output: %v", result.GetOutput())
}
}()
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
time.Sleep(500 * time.Millisecond)
}
if err := stream.CloseSend(); err != nil {
log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)
}
<-waitc
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

Server

func TestExecute(t *testing.T) {
s := MachineServer{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)
mockResults := []*machine.Result{}
callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 1, Operator: "PUSH"}, nil)
callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 2, Operator: "PUSH"}, nil).After(callRecv1)
callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
callRecv4 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 3, Operator: "PUSH"}, nil).After(callRecv3)
callRecv5 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "ADD"}, nil).After(callRecv4)
callRecv6 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "FIB"}, nil).After(callRecv5)
mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv6)
mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResults = append(mockResults, result)
return nil
}).AnyTimes()
wants := []float32{2, 5, 0, 1, 1, 2, 3, 5}
err := s.Execute(mockServerStream)
if err != nil {
t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
}
for i, result := range mockResults {
got := result.GetOutput()
want := wants[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.004s

Client

func testExecute(t *testing.T, client machine.MachineClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
instructions := []*machine.Instruction{}
instructions = append(instructions, &machine.Instruction{Operand: 5, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operand: 6, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operator: "MUL"})
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(%v) = _, %v: ", client, ctx, err)
}
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
result, err := stream.Recv()
if err != nil {
log.Fatalf("%v.Recv() got error %v, want %v", stream, err, nil)
}
got := result.GetOutput()
want := float32(30)
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestExecute(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockClientStream.EXPECT().Recv().Return(&machine.Result{Output: 30}, nil)
mockMachineClient.EXPECT().Execute(
gomock.Any(), // context
).Return(mockClientStream, nil)
testExecute(t, mockMachineClient)
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming [operator:"PUSH" operand:1 operator:"PUSH" operand:2 operator:"ADD" operator:"PUSH" operand:3 operator:"DIV" operator:"PUSH" operand:4 operator:"MUL" operator:"FIB" operator:"PUSH" operand:5 operator:"PUSH" operand:6 operator:"SUB" ]
output: 3
output: 1
output: 4
output: 0
output: 1
output: 1
output: 2
output: 3
output: -1
EOF

Bonus

  1. how many dependencies are there
  2. which are the essential & most used dependencies
  3. is it feasible to install dependencies on the test (and even the developer’s) environment, etc.
const bufSize = 1024 * 1024var lis *bufconn.Listenerfunc init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
machine.RegisterMachineServer(s, &MachineServer{})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
}
func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
func testExecute_Live(t *testing.T, client machine.MachineClient, instructions []*machine.Instruction, wants []float32) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
waitc := make(chan struct{})
go func() {
i := 0
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
close(waitc)
return
}
if err != nil {
log.Printf("Err: %v", err)
}
log.Printf("output: %v", result.GetOutput())
got := result.GetOutput()
want := wants[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
i++
}
}()
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
if err := stream.CloseSend(); err != nil {
log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)
}
<-waitc
}
func TestExecute_Live(t *testing.T) {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
client := machine.NewMachineClient(conn)
// try Execute()
instructions := []*machine.Instruction{
{Operand: 1, Operator: "PUSH"},
{Operand: 2, Operator: "PUSH"},
{Operator: "ADD"},
{Operand: 3, Operator: "PUSH"},
{Operator: "DIV"},
{Operand: 4, Operator: "PUSH"},
{Operator: "MUL"},
{Operator: "FIB"},
{Operand: 5, Operator: "PUSH"},
{Operand: 6, Operator: "PUSH"},
{Operator: "SUB"},
}
wants := []float32{3, 1, 4, 0, 1, 1, 2, 3, -1}
testExecute_Live(t, client, instructions, wants)
}
source: server/machine_live_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_live_test.go
ok command-line-arguments 0.005s
  • How to define an interface for bi-directional streaming RPC using protobuf
  • How to write gRPC server & client logic for bi-directional streaming RPC
  • How to write and run the unit test for bi-directional streaming RPC
  • How to write and run the unit test for bi-directional streaming RPC by running the server live leveraging the bufconn package
  • How to run the gRPC server and a client can communicate to it

--

--

--

An OSS Enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Benchmarking 1 million C# tasks vs Go goroutines: Is there any difference?

Time.deltaTime in Unity3D — Code Saying

OCR is easy with python

Selenium Priority for Selectors

A Simple Cloud-based Software Explainer

Alvin’s Dew Drop Daily — Issue #167

Create REST API in Laravel with authentication using Passport

Snowflake Role Queries

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Toran Sahu

Toran Sahu

An OSS Enthusiast

More from Medium

gRPC proto files Best Practices

Concurrency in Golang

Golang Default GOPROXY Hidden Networking Requirements

How to Share GRPC Proto Buffers Files Across Microservices