Part-2: Building a basic microservice with unidirectional-streaming gRPC using Golang

Have you ever wondered while developing a REST API that if the server could have got the capability to stream responses using the same TCP connection? Or, reversely if the REST client could have got the capability to stream the requests to the server, this could have saved the cost of bringing up another service (like WebSocket) just for the sake of fulfilling such requirement.

Then REST isn’t the only API architecture available, and for such use-cases, the gRPC model has begun to play a crucial role. gRPC’s unidirectional-streaming RPC feature has got your back on those requirements.

Objective

Previously in the part-1 of this blog series, we’ve learned the basics of gRPC, how to implement a Simple/Unary gRPC, how to write unit tests, how to launch the server & client. part-1 walks you through a step-by-step guide to implement a Stack Machine server & client leveraging Simple/Unary RPC.

If you’ve missed that, it is highly recommended to go through it to get familiar with the basics of the gRPC framework.

Introduction

Client streaming RPCs where:

  • the client writes a sequence of messages and sends them to the server using a provided stream
  • once the client has finished writing the messages, it waits for the server to read them and return its response

Server streaming RPCs where:

  • the client sends a request to the server and gets a stream to read a sequence of messages back
  • the client reads from the returned stream until there are no more messages

The best thing is gRPC guarantees message ordering within an individual RPC call.

Now let’s improve the “Stack Machine” server & client codes to support unidirectional streaming.

Implementing Server Streaming RPC

Where the FIB RPC will:

  • perform a Fibonacci operation
  • accept an integer input i.e. generate first N numbers of the Fibonacci series
  • will respond with a stream of integers i.e. first N numbers of the Fibonacci series

And later we’ll see how Client Streaming can be implemented so that a client can input a stream of Instructions to the Stack Machine in real-time rather than sending a single request comprised of a set of Instructions.

Update the protobuf

  • A server streaming RPC where the client sends a request to the server using the stub and waits for a response to come back as a stream of result
  • To specify a server-side streaming method, need to place the stream keyword before the response type
// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto

You can observe that the declaration of ServerStreamingExecute() in the MachineClient and MachineServer interface has been auto-generated:

... type MachineClient interface {
Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
... type MachineServer interface {
Execute(context.Context, *InstructionSet) (*Result, error)
+ ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
source: machine/machine.pb.go

Update the Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go
# command-line-arguments
cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer:
*server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)

So, it’s always the best practice to keep your service in sync with the service definition i.e. machine/machine.proto & machine/machine.pb.go. If you do not want to support a particular RPC, or its implementation is not yet ready, just respond with Unimplemented error status. Example:

// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet")
}
source: server/machine.go

Before we implement the ServerStreamingExecute() RPC, let's write a Fibonacci series generator called FibonacciRange().

package utilsfunc FibonacciRange(n int) <-chan int {
ch := make(chan int)
fn := make([]int, n+1, n+2)
fn[0] = 0
fn[1] = 1
go func() {
defer close(ch)
for i := 0; i <= n; i++ {
var f int
if i < 2 {
f = fn[i]
} else {
f = fn[i-1] + fn[i-2]
}
fn[i] = f
ch <- f
}
}()
return ch
}
source: utils/fibonacci.go

The blog series assumes that you’re familiar with Golang basics & its concurrency paradigms & concepts like Channels. You can read more about the Channels from the official document.

This function yields the numbers of Fibonacci series till the Nth position.

Let’s also add a small unit test to validate the FibonacciRange() generator.

package utilsimport (
"testing"
)
func TestFibonacciRange(t *testing.T) {
fibOf5 := []int{0, 1, 1, 2, 3, 5}
i := 0
for f := range FibonacciRange(5) {
if f != fibOf5[i] {
t.Errorf("got %d, want %d", f, fibOf5[i])
}
i++
}
}
source: utils/fibonacci_test.go

Let’s implement ServerStreamingExecute() to handle the basic instructions PUSH/POP, and FIB with proper error handling. On completion of the execution of instructions set, it should POP the result from the Stack and should respond with a Result object to the client.

func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {
if len(instructions.GetInstructions()) == 0 {
return status.Error(codes.InvalidArgument, "No valid instructions received")
}
var stack stack.Stack for _, instruction := range instructions.GetInstructions() {
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
log.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case FIB:
n, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == FIB {
for f := range utils.FibonacciRange(int(n)) {
log.Println(float32(f))
stream.Send(&machine.Result{Output: float32(f)})
}
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
return nil
}
source: server/machine.go

Update the Client

func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Executing %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ServerStreamingExecute(ctx, instructions)
if err != nil {
log.Fatalf("%v.Execute(_) = _, %v: ", client, err)
}
for {
result, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
break
}
if err != nil {
log.Printf("Err: %v", err)
break
}
log.Printf("output: %v", result.GetOutput())
}
log.Println("DONE!")
}
source: client/machine.go

Test

Server

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go

The updated mock_machine/machine_mock.go should look like this.

Now, we’re good to write unit test for server-side streaming RPC ServerStreamingExecute():

func TestServerStreamingExecute(t *testing.T) {
s := MachineServer{}
// set up test table
tests := []struct {
instructions []*machine.Instruction
want []float32
}{
{
instructions: []*machine.Instruction{
{Operand: 5, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5},
},
{
instructions: []*machine.Instruction{
{Operand: 6, Operator: "PUSH"},
{Operator: "FIB"},
},
want: []float32{0, 1, 1, 2, 3, 5, 8},
},
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl)
for _, tt := range tests {
mockResults := []*machine.Result{}
mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResults = append(mockResults, result)
return nil
}).AnyTimes()
req := &machine.InstructionSet{Instructions: tt.instructions} err := s.ServerStreamingExecute(req, mockServerStream)
if err != nil {
t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err)
}
for i, result := range mockResults {
got := result.GetOutput()
want := tt.want[i]
if got != want {
t.Errorf("got %v, want %v", got, want)
}
}
}
}
source: server/machine_test.go

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s

Client

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go
source: mock_machine/machine_mock.go

Let’s add unit test to test client-side logic for server-side streaming RPC ServerStreamingExecute() using mock MockMachine_ServerStreamingExecuteClient :

func TestServerStreamingExecute(t *testing.T) {
instructions := []*machine.Instruction{}
instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"})
instructions = append(instructions, &machine.Instruction{Operator: "FIB"})
instructionSet := &machine.InstructionSet{Instructions: instructions}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl)
clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil) mockMachineClient.EXPECT().ServerStreamingExecute(
gomock.Any(), // context
instructionSet, // rpc uniary message
).Return(clientStream, nil)
if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil {
t.Fatalf("Test failed: %v", err)
}
}
source: mock_machine/machine_mock_test.go

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!

Awesome! A Server Streaming RPC has been successfully implemented.

Implementing Client Streaming RPC

Update the protobuf

service Machine {
- rpc Execute(InstructionSet) returns (Result) {}
+ rpc Execute(stream Instruction) returns (Result) {}
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
}
source: machine/machine.proto

Generating the updated client and server interface Go code

~/disk/E/workspace/grpc-eg-go
$ SRC_DIR=./
$ DST_DIR=$SRC_DIR
$ protoc \
-I=$SRC_DIR \
--go_out=plugins=grpc:$DST_DIR \
$SRC_DIR/machine/machine.proto

You’ll notice that declaration of Execute() has been updated from MachineServer & MachineClient interfaces.

type MachineServer interface {
- Execute(context.Context, *InstructionSet) (*Result, error)
+ Execute(Machine_ExecuteServer) error
ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error
}
type MachineClient interface {
- Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)
+ Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)
ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)
}
source: machine/machine.pb.go

Update the Server

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {
var stack stack.Stack
for {
instruction, err := stream.Recv()
if err == io.EOF {
log.Println("EOF")
output, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if err := stream.SendAndClose(&machine.Result{
Output: output,
}); err != nil {
return err
}
return nil
}
if err != nil {
return err
}
operand := instruction.GetOperand()
operator := instruction.GetOperator()
op_type := OperatorType(operator)
fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type {
case PUSH:
stack.Push(float32(operand))
case POP:
stack.Pop()
case ADD, SUB, MUL, DIV:
item2, popped := stack.Pop()
item1, popped := stack.Pop()
if !popped {
return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")
}
if op_type == ADD {
stack.Push(item1 + item2)
} else if op_type == SUB {
stack.Push(item1 - item2)
} else if op_type == MUL {
stack.Push(item1 * item2)
} else if op_type == DIV {
stack.Push(item1 / item2)
}
default:
return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)
}
}
}
source: server/machine.go

Update the Client

func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) {
log.Printf("Streaming %v", instructions)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Execute(ctx)
if err != nil {
log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)
}
for _, instruction := range instructions.GetInstructions() {
if err := stream.Send(instruction); err != nil {
log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)
}
}
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Println(result)
}
source: client/machine.go

Test

~/disk/E/workspace/grpc-eg-go
$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

The updated mock_machine/machine_mock.go should look like this.

Server

func TestExecute(t *testing.T) {
s := MachineServer{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)
mockResult := &machine.Result{}
callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil)
callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1)
callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)
mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3)
mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn(
func(result *machine.Result) error {
mockResult = result
return nil
})
err := s.Execute(mockServerStream)
if err != nil {
t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)
}
got := mockResult.GetOutput()
want := float32(30)
if got != want {
t.Errorf("got %v, wanted %v", got, want)
}
}
source: server/machine_test.go

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test server/machine.go server/machine_test.go
ok command-line-arguments 0.003s

Client

func TestExecute(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMachineClient := mock_machine.NewMockMachineClient(ctrl)
mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)
mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()
mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil)
mockMachineClient.EXPECT().Execute(
gomock.Any(), // context
).Return(mockClientStream, nil)
testExecute(t, mockMachineClient)
}
source: mock_machine/machine_mock_test.go

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go
$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go
ok command-line-arguments 0.003s

Run all the unit tests at once:

~/disk/E/workspace/grpc-eg-go
$ go test ./...
? github.com/toransahu/grpc-eg-go/client [no test files]
? github.com/toransahu/grpc-eg-go/cmd [no test files]
? github.com/toransahu/grpc-eg-go/machine [no test files]
ok github.com/toransahu/grpc-eg-go/mock_machine (cached)
ok github.com/toransahu/grpc-eg-go/server (cached)
ok github.com/toransahu/grpc-eg-go/utils (cached)
? github.com/toransahu/grpc-eg-go/utils/stack [no test files]

Run

Server

~/disk/E/workspace/grpc-eg-go
$ go run cmd/run_machine_server.go

Client

~/disk/E/workspace/grpc-eg-go
$ go run client/machine.go
Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >
output:30
Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >
output: 0
output: 1
output: 1
output: 2
output: 3
output: 5
output: 8
EOF
DONE!

Awesome!! We have successfully transformed a Unary RPC into Server Streaming RPC.

At the end of this blog, we’ve learned:

  • How to define an interface for uni-directional streaming RPCs using protobuf
  • How to write gRPC server & client logic for uni-directional streaming RPCs
  • How to write and run the unit test for server-streaming & client-streaming RPCs
  • How to run the gRPC server and a client can communicate to it

The source code of this example is available at toransahu/grpc-eg-go.
You can also git checkout to this commit SHA for “Implementing Server Streaming RPC” and to this commit SHA for “Implementing Client Streaming RPC”.

See you in the next part of this blog series.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store