Part-3: Building a basic microservice with bidirectional-streaming gRPC using Golang

Objective

Introduction

  • both sides send a sequence of messages using a read-write stream
  • the two streams operate independently, so clients and servers can read and write in whatever order they like
  • for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes

Implementing Bidirectional Streaming RPC

Update the protobuf

service Machine {
- rpc Execute(stream Instruction) returns (Result) {}
- rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
+ rpc Execute(stream Instruction) returns (stream Result) {}
}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
... type MachineServer interface {
Execute(Machine_ExecuteServer) error
- ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
... type MachineClient interface {
Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
- ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
... source: machine/machine.pb.go

Update the Server

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
var stack stack.Stack
for {
instruction, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
return nil
}
if err != nil {
return err
}
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case ADD, SUB, MUL, DIV:
item2, popped := stack.Pop()
item1, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
var res float32
if op_type == ADD {
res = item1 + item2
} else if op_type == SUB {
res = item1 - item2
} else if op_type == MUL {
res = item1 * item2
} else if op_type == DIV {
res = item1 / item2
}
stack.Push(res)
if err := stream.Send(&machine.Result{Output: float32(res)}); err != nil {
return err
}
case FIB:
n, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == FIB {
for f := range utils.FibonacciRange(int(n)) {
if err := stream.Send(&machine.Result{Output: float32(f)}); err != nil {
return err
}
}
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
}
source: server/machine.go

Update the Client

func runExecute(client machine.MachineClient, instructions []*machine.Instruction) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
waitc := make(chan struct{})
go func() {
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
close(waitc)
return
}
if err != nil {
log.Printf("Err: %v", err)
}
log.Printf("output: %v", result.GetOutput())
}
}()
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
time.Sleep(500 * time.Millisecond)
}
if err := stream.CloseSend(); err != nil {
log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)
}
<-waitc
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

Server

func TestExecute(t *testing.T) {
s := MachineServer{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)
mockResults := []*machine.Result{}
callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 1, Operator: "PUSH"}, nil)
callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 2, Operator: "PUSH"}, nil).After(callRecv1)
callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
callRecv4 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 3, Operator: "PUSH"}, nil).After(callRecv3)
callRecv5 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "ADD"}, nil).After(callRecv4)
callRecv6 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "FIB"}, nil).After(callRecv5)
mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv6)
mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResults = append(mockResults, result)
return nil
}).AnyTimes()
wants := []float32{2, 5, 0, 1, 1, 2, 3, 5}
err := s.Execute(mockServerStream)
if err != nil {
t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
}
for i, result := range mockResults {
got := result.GetOutput()
want := wants[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.004s

Client

func testExecute(t *testing.T, client machine.MachineClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
instructions := []*machine.Instruction{}
instructions = append(instructions, &machine.Instruction{Operand: 5, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operand: 6, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operator: "MUL"})
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(%v) = _, %v: ", client, ctx, err)
}
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
result, err := stream.Recv()
if err != nil {
log.Fatalf("%v.Recv() got error %v, want %v", stream, err, nil)
}
got := result.GetOutput()
want := float32(30)
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
func TestExecute(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockClientStream.EXPECT().Recv().Return(&machine.Result{Output: 30}, nil)
mockMachineClient.EXPECT().Execute(
gomock.Any(), // context
).Return(mockClientStream, nil)
testExecute(t, mockMachineClient)
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming [operator:"PUSH" operand:1 operator:"PUSH" operand:2 operator:"ADD" operator:"PUSH" operand:3 operator:"DIV" operator:"PUSH" operand:4 operator:"MUL" operator:"FIB" operator:"PUSH" operand:5 operator:"PUSH" operand:6 operator:"SUB" ]
output: 3
output: 1
output: 4
output: 0
output: 1
output: 1
output: 2
output: 3
output: -1
EOF

Bonus

  1. how many dependencies are there
  2. which are the essential & most used dependencies
  3. is it feasible to install dependencies on the test (and even the developer’s) environment, etc.
const bufSize = 1024 * 1024var lis *bufconn.Listenerfunc init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
machine.RegisterMachineServer(s, &MachineServer{})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
}
func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
func testExecute_Live(t *testing.T, client machine.MachineClient, instructions []*machine.Instruction, wants []float32) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
waitc := make(chan struct{})
go func() {
i := 0
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
close(waitc)
return
}
if err != nil {
log.Printf("Err: %v", err)
}
log.Printf("output: %v", result.GetOutput())
got := result.GetOutput()
want := wants[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
i++
}
}()
for _, instruction := range instructions {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
if err := stream.CloseSend(); err != nil {
log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)
}
<-waitc
}
func TestExecute_Live(t *testing.T) {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()
client := machine.NewMachineClient(conn)
// try Execute()
instructions := []*machine.Instruction{
{Operand: 1, Operator: "PUSH"},
{Operand: 2, Operator: "PUSH"},
{Operator: "ADD"},
{Operand: 3, Operator: "PUSH"},
{Operator: "DIV"},
{Operand: 4, Operator: "PUSH"},
{Operator: "MUL"},
{Operator: "FIB"},
{Operand: 5, Operator: "PUSH"},
{Operand: 6, Operator: "PUSH"},
{Operator: "SUB"},
}
wants := []float32{3, 1, 4, 0, 1, 1, 2, 3, -1}
testExecute_Live(t, client, instructions, wants)
}
source: server/machine_live_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_live_test.go
ok command-line-arguments 0.005s
  • How to define an interface for bi-directional streaming RPC using protobuf
  • How to write gRPC server & client logic for bi-directional streaming RPC
  • How to write and run the unit test for bi-directional streaming RPC
  • How to write and run the unit test for bi-directional streaming RPC by running the server live leveraging the bufconn package
  • How to run the gRPC server and a client can communicate to it

--

--

--

An OSS Enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Automated change management in a DevOps world

BYOD : Build your own drone… multi-rotor quadcopter that is

Stack vs Heap memory in Java

WordPress powerhouse

A manifesto for making stuff with software

How to Build C/C++ Barcode Reader App on Raspberry Pi 4

[Guide] How to Mint

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Toran Sahu

Toran Sahu

An OSS Enthusiast

More from Medium

What is Concurrency in Golang ?

gRPC Request Routing (Header Based) AWS ALB

Integration tests, Docker and how it all Go(es) together

gorilla/mux 101 (rk-boot): Swagger UI