Part-2: Building a basic microservice with unidirectional-streaming gRPC using Golang

Objective

Previously in the part-1 of this blog series, we’ve learned the basics of gRPC, how to implement a Simple/Unary gRPC, how to write unit tests, how to launch the server & client. part-1 walks you through a step-by-step guide to implement a Stack Machine server & client leveraging Simple/Unary RPC.

If you’ve missed that, it is highly recommended to go through it to get familiar with the basics of the gRPC framework.

Introduction

Implementing Server Streaming RPC

Update the protobuf

// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
... type MachineClient interface {
Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
... type MachineServer interface {
Execute(context.Context, *InstructionSet) (*Result, error)
+ ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
source: machine/machine.pb.go

Update the Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
# command-line-arguments
cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer:
*server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)
// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet")
}
source: server/machine.go
package utilsfunc FibonacciRange(n int) <-chan int {
ch := make(chan int)
fn := make([]int, n+1, n+2)
fn[0] = 0
fn[1] = 1
go func() {
defer close(ch)
for i := 0; i <= n; i++ {
var f int
if i < 2 {
f = fn[i]
} else {
f = fn[i-1] + fn[i-2]
}
fn[i] = f
ch <- f
}
}()
return ch
}
source: utils/fibonacci.go

The blog series assumes that you’re familiar with Golang basics & its concurrency paradigms & concepts like Channels. You can read more about the Channels from the official document.

package utilsimport (
"testing"
)
func TestFibonacciRange(t *testing.T) {
fibOf5 := []int{0, 1, 1, 2, 3, 5}
i := 0
for f := range FibonacciRange(5) {
if f != fibOf5[i] {
t.Errorf("got %d, want %d", f, fibOf5[i])
}
i++
}
}
source: utils/fibonacci_test.go
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
if len(instructions.GetInstructions()) == 0 {
return status.Error(codes.InvalidArgument, "No valid instructions received")
}
var stack stack.Stack for _, instruction := range instructions.GetInstructions() {
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
log.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case FIB:
n, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == FIB {
for f := range utils.FibonacciRange(int(n)) {
log.Println(float32(f))
stream.Send(&machine.Result{Output: float32(f)})
}
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
return nil
}
source: server/machine.go

Update the Client

func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Executing %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ServerStreamingExecute(ctx, instructions)
if err != nil {
log.Fatalf("%v.Execute(_) = _, %v: ", client, err)
}
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
break
}
if err != nil {
log.Printf("Err: %v", err)
break
}
log.Printf("output: %v", result.GetOutput())
}
log.Println("DONE!")
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go
func TestServerStreamingExecute(t *testing.T) {
s := MachineServer{}
// set up test table
tests := []struct {
instructions []*machine.Instruction
want []float32
}{
{
instructions: []*machine.Instruction{
{Operand: 5, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5},
},
{
instructions: []*machine.Instruction{
{Operand: 6, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5, 8},
},
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl)
for _, tt := range tests {
mockResults := []*machine.Result{}
mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResults = append(mockResults, result)
return nil
}).AnyTimes()
req := &machine.InstructionSet{Instructions: tt.instructions} err := s.ServerStreamingExecute(req, mockServerStream)
if err != nil {
t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err)
}
for i, result := range mockResults {
got := result.GetOutput()
want := tt.want[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s
~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go
source: mock_machine/machine_mock.go
func TestServerStreamingExecute(t *testing.T) {
instructions := []*machine.Instruction{}
instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operator: "FIB"})
instructionSet := &machine.InstructionSet{Instructions: instructions}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl)
clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil) mockMachineClient.EXPECT().ServerStreamingExecute(
gomock.Any(), // context
instructionSet, // rpc uniary message
).Return(clientStream, nil)
if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil {
t.Fatalf("Test failed: %v", err)
}
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!

Implementing Client Streaming RPC

Update the protobuf

service Machine {
- rpc Execute(InstructionSet) returns (Result) {}
+ rpc Execute(stream Instruction) returns (Result) {}
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto
type MachineServer interface {
- Execute(context.Context, *InstructionSet) (*Result, error)
+ Execute(Machine_ExecuteServer) error
ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
type MachineClient interface {
- Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
source: machine/machine.pb.go

Update the Server

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
var stack stack.Stack
for {
instruction, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
output, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if err := stream.SendAndClose(&machine.Result{
Output: output,
}); err != nil {
return err
}
return nil
}
if err != nil {
return err
}
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case ADD, SUB, MUL, DIV:
item2, popped := stack.Pop()
item1, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == ADD {
stack.Push(item1 + item2)
} else if op_type == SUB {
stack.Push(item1 - item2)
} else if op_type == MUL {
stack.Push(item1 * item2)
} else if op_type == DIV {
stack.Push(item1 / item2)
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
}
source: server/machine.go

Update the Client

func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
for _, instruction := range instructions.GetInstructions() {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Println(result)
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go
func TestExecute(t *testing.T) {
s := MachineServer{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)
mockResult := &machine.Result{}
callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil)
callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1)
callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3)
mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResult = result
return nil
})
err := s.Execute(mockServerStream)
if err != nil {
t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
}
got := mockResult.GetOutput()
want := float32(30)
if got != want {
t.Errorf("got %v, wanted %v", got, want)
}
}
source: server/machine_test.go
~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s
func TestExecute(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil)
mockMachineClient.EXPECT().Execute(
gomock.Any(), // context
).Return(mockClientStream, nil)
testExecute(t, mockMachineClient)
}
source: mock_machine/machine_mock_test.go
~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s
~/disk/E/workspace/grpc-eg-go
$ go test ./...
? github.com/toransahu/grpc-eg-go/client [no test files]
? github.com/toransahu/grpc-eg-go/cmd [no test files]
? github.com/toransahu/grpc-eg-go/machine [no test files]
ok github.com/toransahu/grpc-eg-go/mock_machine (cached)
ok github.com/toransahu/grpc-eg-go/server (cached)
ok github.com/toransahu/grpc-eg-go/utils (cached)
? github.com/toransahu/grpc-eg-go/utils/stack [no test files]

Run

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store