Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf protocol #2161

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions p2p/protocol/perf/perf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package perf

import (
"context"
"encoding/binary"
"fmt"
"io"

logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

var log = logging.Logger("perf")

const (
BlockSize = 64 << 10

ID = "/perf/1.0.0"
)

type PerfService struct {
Host host.Host
}

func NewPerfService(h host.Host) *PerfService {
ps := &PerfService{h}
h.SetStreamHandler(ID, ps.PerfHandler)
return ps
}

func (p *PerfService) PerfHandler(s network.Stream) {
buf := pool.Get(BlockSize)
defer pool.Put(buf)

u64Buf := make([]byte, 8)
_, err := io.ReadFull(s, u64Buf)
if err != nil {
log.Errorw("err", err)
s.Reset()
return
}

bytesToSend := binary.BigEndian.Uint64(u64Buf)

_, err = p.drainStream(context.Background(), s, buf)
marten-seemann marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorw("err", err)
s.Reset()
return
}

err = p.sendBytes(context.Background(), s, bytesToSend, buf)
if err != nil {
log.Errorw("err", err)
s.Reset()
return
}

}

func (ps *PerfService) sendBytes(ctx context.Context, s network.Stream, bytesToSend uint64, buf []byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to read if you allocate buf in this function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You then have to allocate the buf twice in the send and drain paths. Yes, I know this is a pool, but you still do the put and get work.

for bytesToSend > 0 {
toSend := buf
if bytesToSend < BlockSize {
toSend = buf[:bytesToSend]
}

n, err := s.Write(toSend)
if err != nil {
return nil
}
bytesToSend -= uint64(n)
}
s.CloseWrite()

return nil
}

func (ps *PerfService) drainStream(ctx context.Context, s network.Stream, buf []byte) (uint64, error) {
var recvd uint64
for {
n, err := s.Read(buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can copy into io.Discard.

recvd += uint64(n)
if err == io.EOF {
return recvd, nil
} else if err != nil {
s.Reset()
return recvd, err
}
}
}

func (ps *PerfService) RunPerf(ctx context.Context, p peer.ID, bytesToSend uint64, bytesToRecv uint64) error {
s, err := ps.Host.NewStream(ctx, p, ID)
if err != nil {
return err
}

buf := pool.Get(BlockSize)
defer pool.Put(buf)

sizeBuf := make([]byte, 8)
binary.BigEndian.PutUint64(sizeBuf, bytesToRecv)

_, err = s.Write(sizeBuf)
if err != nil {

return err
}

err = ps.sendBytes(ctx, s, bytesToSend, buf)
if err != nil {
return err
}

recvd, err := ps.drainStream(ctx, s, buf)
if err != nil {
return err
}

if recvd != bytesToRecv {
return fmt.Errorf("expected to recv %d bytes, got %d", bytesToRecv, recvd)
}
return err
}
43 changes: 43 additions & 0 deletions p2p/protocol/perf/perf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package perf

import (
"context"
"fmt"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func TestPerf(t *testing.T) {
h1, err := libp2p.New()
require.NoError(t, err)
h2, err := libp2p.New()
require.NoError(t, err)

perf1 := NewPerfService(h1)
_ = NewPerfService(h2)

err = h1.Connect(context.Background(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
require.NoError(t, err)

start := time.Now()

// Warmup
err = perf1.RunPerf(context.Background(), h2.ID(), 10<<20, 10<<20)
require.NoError(t, err)

err = perf1.RunPerf(context.Background(), h2.ID(), 0, 10<<20)
require.NoError(t, err)
fmt.Println("10 MiB download took: ", time.Since(start))

err = perf1.RunPerf(context.Background(), h2.ID(), 10<<20, 0<<20)
require.NoError(t, err)
fmt.Println("10 MiB upload took: ", time.Since(start))

}