Skip to content

Commit

Permalink
Add examples (#4)
Browse files Browse the repository at this point in the history
This pr mainly adds examples. Also some minor bug fix:

Calls (*nats.Conn).Flush() after subscribed to make sure the subscription is take effect.
  • Loading branch information
huangjunwen authored Aug 11, 2020
1 parent e39e864 commit 1ceb337
Show file tree
Hide file tree
Showing 29 changed files with 1,908 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
examples/docker-services/mysql/data/
examples/sendmail/client/client
examples/sendmail/server/server
examples/sendmail/server/conf.json
examples/bench-natsrpc/bench-natsrpc
3 changes: 3 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Examples

`docker-services` contains external services (e.g. `nats-streaming-server`/`mysql`/`jaeger` ...) used by examples, need `docker` and `docker-compose` to run.
5 changes: 5 additions & 0 deletions examples/bench-natsrpc/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
build:
go build

clean:
rm -f bench-natsrpc
21 changes: 21 additions & 0 deletions examples/bench-natsrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
### Bench Natsrpc

This program is used to bench natsrpc's performance under different parameters. See command line options for detail.

Some examples:

#### 100 go routines and payload of 1 byte

![p100_l1](./imgs/p100_l1.png)

#### 1000 go routines and payload of 1 byte

![p1000_l1](./imgs/p1000_l1.png)

#### 100 go routines and payload of 1000 bytes

![p100_l1000](./imgs/p100_l1000.png)

#### 1000 go routines and payload of 1000 bytes

![p1000_l1000](./imgs/p1000_l1000.png)
Binary file added examples/bench-natsrpc/imgs/p1000_l1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/bench-natsrpc/imgs/p1000_l1000.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/bench-natsrpc/imgs/p100_l1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/bench-natsrpc/imgs/p100_l1000.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
277 changes: 277 additions & 0 deletions examples/bench-natsrpc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package main

import (
"context"
"flag"
"fmt"
"math/rand"
"os"
"runtime/pprof"
"sort"
"sync"
"time"

"github.com/codahale/hdrhistogram"
"github.com/huangjunwen/golibs/logr"
"github.com/huangjunwen/golibs/logr/zerologr"
"github.com/juju/ratelimit"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/huangjunwen/nproto/v2/natsrpc"
nprpc "github.com/huangjunwen/nproto/v2/rpc"
)

const (
// See docker-compose.yml.
natsHost = "localhost"
natsPort = 4222
)

var (
payloadLen int
rpcNum int
clientNum int
serverNum int
parallel int
callRate int
timeoutSec int
cpuProfile string
)

func main() {
flag.IntVar(&payloadLen, "l", 1000, "Payload length.")
flag.IntVar(&clientNum, "c", 10, "Client number.")
flag.IntVar(&serverNum, "s", 10, "Client number.")
flag.IntVar(&rpcNum, "n", 10000, "Total RPC number.")
flag.IntVar(&parallel, "p", 10, "Number of go routines to invoke RPCs.")
flag.IntVar(&callRate, "r", 100000, "Call rate limit per second.")
flag.IntVar(&timeoutSec, "t", 3, "RPC timeout in seconds.")
flag.StringVar(&cpuProfile, "cpu", "", "CPU profile file name.")
flag.Parse()

var logger logr.Logger
{
out := zerolog.NewConsoleWriter()
out.TimeFormat = time.RFC3339Nano
out.Out = os.Stderr
lg := zerolog.New(&out).With().Timestamp().Logger()
logger = (*zerologr.Logger)(&lg)
}

var err error
defer func() {
if e := recover(); e != nil {
err, ok := e.(error)
if !ok {
err = fmt.Errorf("%+v", e)
}
logger.Error(err, "panic", true)
}
}()

// Prepare rpc params.
spec := nprpc.MustRPCSpec(
"bench-natsrpc",
"echo",
func() interface{} {
return wrapperspb.Bytes(nil)
},
func() interface{} {
return wrapperspb.Bytes(nil)
},
)
handler := func(ctx context.Context, input interface{}) (interface{}, error) {
return input, nil
}
var input *wrapperspb.BytesValue
{
data := make([]byte, payloadLen)
if _, err = rand.Read(data); err != nil {
panic(err)
}
input = wrapperspb.Bytes(data)
}

// Prepare bench params.
rpcNumPerGoroutine := rpcNum / parallel
rpcNumActual := rpcNumPerGoroutine * parallel
timeout := time.Duration(timeoutSec) * time.Second
durations := make([]time.Duration, rpcNumActual)
rl := ratelimit.NewBucketWithRate(float64(callRate), int64(parallel))
logger.Info("Payload Len(-l) :", "value", payloadLen)
logger.Info("RPC Num(-n) :", "value", rpcNumActual)
logger.Info("Parallel(-p) :", "value", parallel)
logger.Info("Call Rate Limit(-r) :", "value", callRate)
logger.Info("Timeout(-t) :", "value", timeout.String())

// Start serverNum servers.
for i := 0; i < serverNum; i++ {

// nats connection.
var nc *nats.Conn
{
opts := nats.GetDefaultOptions()
opts.Url = fmt.Sprintf("nats://%s:%d", natsHost, natsPort)
opts.MaxReconnect = -1
nc, err = opts.Connect()
if err != nil {
panic(err)
}
defer nc.Close()
}

var sc *natsrpc.ServerConn
{
sc, err = natsrpc.NewServerConn(
nc,
natsrpc.SCOptLogger(logger),
)
if err != nil {
panic(err)
}
defer sc.Close()
}

server := natsrpc.NewPbJsonServer(sc)
if err = server.RegistHandler(spec, handler); err != nil {
panic(err)
}
}
logger.Info("Servers ready", "serverNum", serverNum)

// Make clientNum clients.
handlers := make([]nprpc.RPCHandler, 0, clientNum)
for i := 0; i < clientNum; i++ {

// nats connection.
var nc *nats.Conn
{
opts := nats.GetDefaultOptions()
opts.Url = fmt.Sprintf("nats://%s:%d", natsHost, natsPort)
opts.MaxReconnect = -1
nc, err = opts.Connect()
if err != nil {
panic(err)
}
defer nc.Close()
}

var cc *natsrpc.ClientConn
{
cc, err = natsrpc.NewClientConn(
nc,
natsrpc.CCOptTimeout(2*time.Second),
)
if err != nil {
panic(err)
}
}

client := natsrpc.NewPbJsonClient(cc)
handler := client.MakeHandler(spec)
handlers = append(handlers, handler)
}
logger.Info("Clients ready", "clientNum", clientNum)

// Prof.
if cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
panic(err)
}
defer f.Close()

if err := pprof.StartCPUProfile(f); err != nil {
panic(err)
}
}

// Start.
wg := &sync.WaitGroup{}
wg.Add(parallel)
elapseStart := time.Now()
for i := 0; i < parallel; i++ {
go func(i int) {
defer wg.Done()

// Choose one client.
handler := handlers[i%clientNum]
// Filling durations[offset: offset+rpcNumPerGoroutine]
offset := i * rpcNumPerGoroutine

for j := 0; j < rpcNumPerGoroutine; j++ {
func(j int) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

rl.Wait(1)
callStart := time.Now()
_, err := handler(ctx, input)
if err != nil {
panic(err)
}
durations[offset+j] = time.Since(callStart)
}(j)
}

}(i)
}

logger.Info("===Waiting===")
wg.Wait()

// Post calculation.
elapse := time.Since(elapseStart)
if cpuProfile != "" {
pprof.StopCPUProfile()
}
sort.Slice(durations, func(i, j int) bool {
return durations[i] < durations[j]
})

h := hdrhistogram.New(int64(durations[0]), int64(durations[len(durations)-1]), 5)
for _, d := range durations {
h.RecordValue(int64(d))
}
avg := time.Duration(h.Mean())

// http://vanillajava.blogspot.com/2012/04/what-is-latency-throughput-and-degree.html
//
// | <------------------ 1 second -------------------> |
// | ____ ____ ____ ____ ____ ____ ____ ____ ____ ____ |
// | | | | | | | | | | | | | | | | | | | | | |
// | ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- |
// | ____ ____ ____ ____ ____ ____ ____ ____ ____ ____ |
// | | | | | | | | | | | | | | | | | | | | | |
// | ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- |
// | ____ ____ ____ ____ ____ ____ ____ ____ ____ ____ |
// | | | | | | | | | | | | | | | | | | | | | |
// | ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- |
//
// For example:
// suppose Throughput == 30 (30 tasks finished in 1 second),
// and Latency == 0.1 s (10 tasks can be handled in sequence in 1 second),
// then Concurrency = Throughput / (1/Latency) = 30 / (1/0.1) = 30 * 0.1 = 3
throughput := float64(rpcNumActual) / elapse.Seconds()
concurrency := throughput * avg.Seconds()

logger.Info("Elapse :", "value", elapse.String())
logger.Info("Avg Latency :", "value", avg.String())
logger.Info("Throughput (=RPC Num/Elapse) :", "value", throughput)
logger.Info("Concurrency (=Throughput*Avg Latency) :", "value", concurrency)
logger.Info("10 :", "value", time.Duration(h.ValueAtQuantile(10)).String())
logger.Info("50 :", "value", time.Duration(h.ValueAtQuantile(50)).String())
logger.Info("75 :", "value", time.Duration(h.ValueAtQuantile(75)).String())
logger.Info("80 :", "value", time.Duration(h.ValueAtQuantile(80)).String())
logger.Info("85 :", "value", time.Duration(h.ValueAtQuantile(85)).String())
logger.Info("90 :", "value", time.Duration(h.ValueAtQuantile(90)).String())
logger.Info("95 :", "value", time.Duration(h.ValueAtQuantile(95)).String())
logger.Info("99 :", "value", time.Duration(h.ValueAtQuantile(99)).String())
logger.Info("99.9 :", "value", time.Duration(h.ValueAtQuantile(99.9)).String())
logger.Info("99.99 :", "value", time.Duration(h.ValueAtQuantile(99.99)).String())
logger.Info("99.999 :", "value", time.Duration(h.ValueAtQuantile(99.999)).String())
logger.Info("100 :", "value", time.Duration(h.ValueAtQuantile(100)).String())

}
8 changes: 8 additions & 0 deletions examples/docker-services/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
up:
docker-compose up -d

down:
docker-compose down

clean:
sudo rm -r mysql/data
60 changes: 60 additions & 0 deletions examples/docker-services/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: "3.1"

services:

mysql:
image: "mysql:8.0.19"
container_name: "example_mysql"
command:
- "--character-set-server=utf8mb4"
- "--collation-server=utf8mb4_unicode_ci"
# - "--general-log=1"
- "--general-log-file=/var/lib/mysql/general-query.log"
# Turns on binlog with row format, and records full meta data (MySQL 8+).
- "--log-bin=/var/lib/mysql/binlog"
- "--server-id=1"
- "--max-binlog-size=100M"
- "--binlog-format=ROW"
- "--binlog-row-image=full"
- "--binlog-row-metadata=full"
# Use GTID mode.
- "--gtid-mode=ON"
- "--enforce-gtid-consistency=ON"
# Turns off performance_schema to reduce memoery usage.
- "--performance-schema=off"
volumes:
- "./mysql/data:/var/lib/mysql"
- "./mysql/init:/docker-entrypoint-initdb.d"
environment:
MYSQL_ROOT_PASSWORD: "123456"
TZ: "Asia/Shanghai"
LANG: "C.UTF-8"
# ports:
# - "127.0.0.1:3306:3306"
network_mode: host
restart: on-failure

stan:
image: "nats-streaming:0.18-alpine"
container_name: "example_stan"
command:
- "-m"
- "8222"
- "-sc"
- "/stan-conf/stan.conf"
volumes:
- "./stan/conf:/stan-conf"
# ports:
# - "127.0.0.1:4222:4222"
# - "127.0.0.1:8222:8222"
network_mode: host
restart: on-failure

jaeger:
image: "jaegertracing/all-in-one:1.18.0"
container_name: "example_jaeger"
# ports:
# - "127.0.0.1:16686:16686"
# - "127.0.0.1:14268:14268"
network_mode: host
restart: on-failure
Loading

0 comments on commit 1ceb337

Please sign in to comment.