Skip to content

Commit

Permalink
feat(perf): continuosly measure on single conn (iperf-style) (#276)
Browse files Browse the repository at this point in the history
Our current throughput tests open a connection, open a stream,
up- or download 100MB and close the connection. 100 MB is not enough on the
given path (60ms, ~5gbit/s) to exit congestion controller's slow-start. See
#261 for details.

Instead of downloading 100MB multiple times, each on a new connection, establish
a single connection and continuously measure the throughput for a fixed
duration (20s).
  • Loading branch information
mxinden authored Oct 25, 2023
1 parent 9247c9f commit 0a8dbab
Show file tree
Hide file tree
Showing 16 changed files with 32,579 additions and 1,738 deletions.
34 changes: 28 additions & 6 deletions perf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,33 @@ Given you have provisioned your infrastructure, you can now build and run the li
- `--download-bytes` number of bytes to download per stream.
- Output
- Logging MUST go to `stderr`.
- Measurement output is printed to `stdout` as JSON in the form of:
```json
{"latency": 0.246442851}
```
Note that the measurement includes the time to (1) establish the
connection, (2) upload the bytes and (3) download the bytes.
- Measurement output is printed to `stdout` as JSON.
- The output schema is:
``` typescript
interface Data {
type: "intermediary" | "final";
timeSeconds: number;
uploadBytes: number;
downloadBytes: number;
}
```
- Every second the client must print the current progress to stdout. See example below. Note the `type: "intermediary"`.
``` json
{
"type": "intermediary",
"timeSeconds": 1.004957645,
"uploadBytes": 73039872,
"downloadBytes": 0
},
```
- Before terminating the client must print a final summary. See example below. Note the `type: "final"`. Also note that the measurement includes the time to (1) establish the connection, (2) upload the bytes and (3) download the bytes.
``` json
{
"type": "final",
"timeSeconds": 60.127230659,
"uploadBytes": 4382392320,
"downloadBytes": 0
}
```
2. For a new implementation, in [`impl/Makefile` include your implementation in the `all` target.](./impl/Makefile#L7)
3. For a new version, reference version in [`runner/src/versions.ts`](./runner/src/versions.ts#L7-L43).
10 changes: 8 additions & 2 deletions perf/impl/go-libp2p/v0.27/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func main() {
}

jsonB, err := json.Marshal(Result{
Latency: time.Since(start).Seconds(),
TimeSeconds: time.Since(start).Seconds(),
UploadBytes: *uploadBytes,
DownloadBytes: *downloadBytes,
Type: "final",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
Expand All @@ -99,7 +102,10 @@ func main() {
}

type Result struct {
Latency float64 `json:"latency"`
Type string `json:"type"`
TimeSeconds float64 `json:"timeSeconds"`
UploadBytes uint64 `json:"uploadBytes"`
DownloadBytes uint64 `json:"downloadBytes"`
}

type simpleReader struct {
Expand Down
57 changes: 56 additions & 1 deletion perf/impl/go-libp2p/v0.27/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"time"

logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
Expand Down Expand Up @@ -89,7 +91,26 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
buf := pool.Get(blockSize)
defer pool.Put(buf)

lastReportTime := time.Now()
lastReportWrite := uint64(0)

for bytesToSend > 0 {
now := time.Now()
if now.Sub(lastReportTime) >= time.Second {
jsonB, err := json.Marshal(Result{
TimeSeconds: now.Sub(lastReportTime).Seconds(),
UploadBytes: lastReportWrite,
Type: "intermediary",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

lastReportTime = now
lastReportWrite = 0
}

toSend := buf
if bytesToSend < blockSize {
toSend = buf[:bytesToSend]
Expand All @@ -100,15 +121,49 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
return err
}
bytesToSend -= uint64(n)
lastReportWrite += uint64(n)
}
return nil
}

func drainStream(s io.Reader) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
recvd, err := io.Copy(io.Discard, &reportingReader{orig: s, LastReportTime: time.Now()})
if err != nil && err != io.EOF {
return uint64(recvd), err
}
return uint64(recvd), nil
}

type reportingReader struct {
orig io.Reader
LastReportTime time.Time
lastReportRead uint64
}

var _ io.Reader = &reportingReader{}

func (r *reportingReader) Read(b []byte) (int, error) {
n, err := r.orig.Read(b)
r.lastReportRead += uint64(n)

now := time.Now()
if now.Sub(r.LastReportTime) >= time.Second {
result := Result{
TimeSeconds: now.Sub(r.LastReportTime).Seconds(),
Type: "intermediary",
DownloadBytes: r.lastReportRead,
}

jsonB, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

r.LastReportTime = now
r.lastReportRead = 0
}

return n, err
}
10 changes: 8 additions & 2 deletions perf/impl/go-libp2p/v0.28/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func main() {
}

jsonB, err := json.Marshal(Result{
Latency: time.Since(start).Seconds(),
TimeSeconds: time.Since(start).Seconds(),
UploadBytes: *uploadBytes,
DownloadBytes: *downloadBytes,
Type: "final",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
Expand All @@ -99,7 +102,10 @@ func main() {
}

type Result struct {
Latency float64 `json:"latency"`
Type string `json:"type"`
TimeSeconds float64 `json:"timeSeconds"`
UploadBytes uint64 `json:"uploadBytes"`
DownloadBytes uint64 `json:"downloadBytes"`
}

type simpleReader struct {
Expand Down
57 changes: 56 additions & 1 deletion perf/impl/go-libp2p/v0.28/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"time"

logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
Expand Down Expand Up @@ -89,7 +91,26 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
buf := pool.Get(blockSize)
defer pool.Put(buf)

lastReportTime := time.Now()
lastReportWrite := uint64(0)

for bytesToSend > 0 {
now := time.Now()
if now.Sub(lastReportTime) >= time.Second {
jsonB, err := json.Marshal(Result{
TimeSeconds: now.Sub(lastReportTime).Seconds(),
UploadBytes: lastReportWrite,
Type: "intermediary",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

lastReportTime = now
lastReportWrite = 0
}

toSend := buf
if bytesToSend < blockSize {
toSend = buf[:bytesToSend]
Expand All @@ -100,15 +121,49 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
return err
}
bytesToSend -= uint64(n)
lastReportWrite += uint64(n)
}
return nil
}

func drainStream(s io.Reader) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
recvd, err := io.Copy(io.Discard, &reportingReader{orig: s, LastReportTime: time.Now()})
if err != nil && err != io.EOF {
return uint64(recvd), err
}
return uint64(recvd), nil
}

type reportingReader struct {
orig io.Reader
LastReportTime time.Time
lastReportRead uint64
}

var _ io.Reader = &reportingReader{}

func (r *reportingReader) Read(b []byte) (int, error) {
n, err := r.orig.Read(b)
r.lastReportRead += uint64(n)

now := time.Now()
if now.Sub(r.LastReportTime) >= time.Second {
result := Result{
TimeSeconds: now.Sub(r.LastReportTime).Seconds(),
Type: "intermediary",
DownloadBytes: r.lastReportRead,
}

jsonB, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

r.LastReportTime = now
r.lastReportRead = 0
}

return n, err
}
10 changes: 8 additions & 2 deletions perf/impl/go-libp2p/v0.29/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func main() {
}

jsonB, err := json.Marshal(Result{
Latency: time.Since(start).Seconds(),
TimeSeconds: time.Since(start).Seconds(),
UploadBytes: *uploadBytes,
DownloadBytes: *downloadBytes,
Type: "final",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
Expand All @@ -99,7 +102,10 @@ func main() {
}

type Result struct {
Latency float64 `json:"latency"`
Type string `json:"type"`
TimeSeconds float64 `json:"timeSeconds"`
UploadBytes uint64 `json:"uploadBytes"`
DownloadBytes uint64 `json:"downloadBytes"`
}

type simpleReader struct {
Expand Down
57 changes: 56 additions & 1 deletion perf/impl/go-libp2p/v0.29/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package main
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"time"

logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
Expand Down Expand Up @@ -89,7 +91,26 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
buf := pool.Get(blockSize)
defer pool.Put(buf)

lastReportTime := time.Now()
lastReportWrite := uint64(0)

for bytesToSend > 0 {
now := time.Now()
if now.Sub(lastReportTime) >= time.Second {
jsonB, err := json.Marshal(Result{
TimeSeconds: now.Sub(lastReportTime).Seconds(),
UploadBytes: lastReportWrite,
Type: "intermediary",
})
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

lastReportTime = now
lastReportWrite = 0
}

toSend := buf
if bytesToSend < blockSize {
toSend = buf[:bytesToSend]
Expand All @@ -100,15 +121,49 @@ func sendBytes(s io.Writer, bytesToSend uint64) error {
return err
}
bytesToSend -= uint64(n)
lastReportWrite += uint64(n)
}
return nil
}

func drainStream(s io.Reader) (uint64, error) {
var recvd int64
recvd, err := io.Copy(io.Discard, s)
recvd, err := io.Copy(io.Discard, &reportingReader{orig: s, LastReportTime: time.Now()})
if err != nil && err != io.EOF {
return uint64(recvd), err
}
return uint64(recvd), nil
}

type reportingReader struct {
orig io.Reader
LastReportTime time.Time
lastReportRead uint64
}

var _ io.Reader = &reportingReader{}

func (r *reportingReader) Read(b []byte) (int, error) {
n, err := r.orig.Read(b)
r.lastReportRead += uint64(n)

now := time.Now()
if now.Sub(r.LastReportTime) >= time.Second {
result := Result{
TimeSeconds: now.Sub(r.LastReportTime).Seconds(),
Type: "intermediary",
DownloadBytes: r.lastReportRead,
}

jsonB, err := json.Marshal(result)
if err != nil {
log.Fatalf("failed to marshal perf result: %s", err)
}
fmt.Println(string(jsonB))

r.LastReportTime = now
r.lastReportRead = 0
}

return n, err
}
Loading

0 comments on commit 0a8dbab

Please sign in to comment.