Part-2: Building a basic microservice with unidirectional-streaming gRPC using Golang

Objective

Introduction

  • the client writes a sequence of messages and sends them to the server using a provided stream
  • once the client has finished writing the messages, it waits for the server to read them and return its response
  • the client sends a request to the server and gets a stream to read a sequence of messages back
  • the client reads from the returned stream until there are no more messages

Implementing Server Streaming RPC

  • perform a Fibonacci operation
  • accept an integer input i.e. generate first N numbers of the Fibonacci series
  • will respond with a stream of integers i.e. first N numbers of the Fibonacci series

Update the protobuf

  • A server streaming RPC where the client sends a request to the server using the stub and waits for a response to come back as a stream of result
  • To specify a server-side streaming method, need to place the stream keyword before the response type
// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
... type MachineClient interface {
Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
... type MachineServer interface {
Execute(context.Context, *InstructionSet) (*Result, error)
+ ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
source: machine/machine.pb.go

Update the Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
# command-line-arguments
cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer:
*server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)
// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet")
}
source: server/machine.go
package utilsfunc FibonacciRange(n int) <-chan int {
ch := make(chan int)
fn := make([]int, n+1, n+2)
fn[0] = 0
fn[1] = 1
go func() {
defer close(ch)
for i := 0; i <= n; i++ {
var f int
if i < 2 {
f = fn[i]
} else {
f = fn[i-1] + fn[i-2]
}
fn[i] = f
ch <- f
}
}()
return ch
}
source: utils/fibonacci.go
package utilsimport (
"testing"
)
func TestFibonacciRange(t *testing.T) {
fibOf5 := []int{0, 1, 1, 2, 3, 5}
i := 0
for f := range FibonacciRange(5) {
if f != fibOf5[i] {
t.Errorf("got %d, want %d", f, fibOf5[i])
}
i++
}
}
source: utils/fibonacci_test.go
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
if len(instructions.GetInstructions()) == 0 {
return status.Error(codes.InvalidArgument, "No valid instructions received")
}
var stack stack.Stack for _, instruction := range instructions.GetInstructions() {
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
log.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case FIB:
n, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == FIB {
for f := range utils.FibonacciRange(int(n)) {
log.Println(float32(f))
stream.Send(&machine.Result{Output: float32(f)})
}
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
return nil
}
source: server/machine.go

Update the Client

func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Executing %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ServerStreamingExecute(ctx, instructions)
if err != nil {
log.Fatalf("%v.Execute(_) = _, %v: ", client, err)
}
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
break
}
if err != nil {
log.Printf("Err: %v", err)
break
}
log.Printf("output: %v", result.GetOutput())
}
log.Println("DONE!")
}
source: client/machine.go

Test

Server

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go
func TestServerStreamingExecute(t *testing.T) {
s := MachineServer{}
// set up test table
tests := []struct {
instructions []*machine.Instruction
want []float32
}{
{
instructions: []*machine.Instruction{
{Operand: 5, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5},
},
{
instructions: []*machine.Instruction{
{Operand: 6, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5, 8},
},
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl)
for _, tt := range tests {
mockResults := []*machine.Result{}
mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResults = append(mockResults, result)
return nil
}).AnyTimes()
req := &machine.InstructionSet{Instructions: tt.instructions} err := s.ServerStreamingExecute(req, mockServerStream)
if err != nil {
t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err)
}
for i, result := range mockResults {
got := result.GetOutput()
want := tt.want[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s

Client

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go
source: mock_machine/machine_mock.go
func TestServerStreamingExecute(t *testing.T) {
instructions := []*machine.Instruction{}
instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operator: "FIB"})
instructionSet := &machine.InstructionSet{Instructions: instructions}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl)
clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil) mockMachineClient.EXPECT().ServerStreamingExecute(
gomock.Any(), // context
instructionSet, // rpc uniary message
).Return(clientStream, nil)
if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil {
t.Fatalf("Test failed: %v", err)
}
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!

Implementing Client Streaming RPC

Update the protobuf

service Machine {
- rpc Execute(InstructionSet) returns (Result) {}
+ rpc Execute(stream Instruction) returns (Result) {}
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
type MachineServer interface {
- Execute(context.Context, *InstructionSet) (*Result, error)
+ Execute(Machine_ExecuteServer) error
ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
type MachineClient interface {
- Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
source: machine/machine.pb.go

Update the Server

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
var stack stack.Stack
for {
instruction, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
output, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if err := stream.SendAndClose(&machine.Result{
Output: output,
}); err != nil {
return err
}
return nil
}
if err != nil {
return err
}
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case ADD, SUB, MUL, DIV:
item2, popped := stack.Pop()
item1, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == ADD {
stack.Push(item1 + item2)
} else if op_type == SUB {
stack.Push(item1 - item2)
} else if op_type == MUL {
stack.Push(item1 * item2)
} else if op_type == DIV {
stack.Push(item1 / item2)
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
}
source: server/machine.go

Update the Client

func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
for _, instruction := range instructions.GetInstructions() {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Println(result)
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

Server

func TestExecute(t *testing.T) {
s := MachineServer{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)
mockResult := &machine.Result{}
callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil)
callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1)
callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3)
mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResult = result
return nil
})
err := s.Execute(mockServerStream)
if err != nil {
t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
}
got := mockResult.GetOutput()
want := float32(30)
if got != want {
t.Errorf("got %v, wanted %v", got, want)
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s

Client

func TestExecute(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil)
mockMachineClient.EXPECT().Execute(
gomock.Any(), // context
).Return(mockClientStream, nil)
testExecute(t, mockMachineClient)
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s
~/disk/E/workspace/grpc-eg-go
$ go test ./...
? github.com/toransahu/grpc-eg-go/client [no test files]
? github.com/toransahu/grpc-eg-go/cmd [no test files]
? github.com/toransahu/grpc-eg-go/machine [no test files]
ok github.com/toransahu/grpc-eg-go/mock_machine (cached)
ok github.com/toransahu/grpc-eg-go/server (cached)
ok github.com/toransahu/grpc-eg-go/utils (cached)
? github.com/toransahu/grpc-eg-go/utils/stack [no test files]

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!
  • How to define an interface for uni-directional streaming RPCs using protobuf
  • How to write gRPC server & client logic for uni-directional streaming RPCs
  • How to write and run the unit test for server-streaming & client-streaming RPCs
  • How to run the gRPC server and a client can communicate to it

--

--

--

An OSS Enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Spacelens x Im Community AMA Recap

OnColliderEnter vs OnTriggerEnter - Overview of Collisions

August Meetup Madness in #Melbourne

Prometheus with Grafana visualization for Linux server monitoring in 5 easy steps

Prometheus with Grafana visualization for Linux server monitoring in 5 easy steps

A concise news update for those with busy schedules ;)

Custom Calculators in MediaPipe

Stuck in a loophole with the basics of OOP in Python? Not after reading this blog.

Graph Data Structure

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Toran Sahu

Toran Sahu

An OSS Enthusiast

More from Medium

Tracking UI: Why tyny.dev Has The Best Command of UI On The Market

AWS Serverless Framework — Configure Custom Domain for API Gateway

How to setup GoPhish Phishing server on AWS

Hacking on the cloud: An effective and cost optimization approach