diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 25eeada2..b395b7f8 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -3,9 +3,9 @@ package graphsync_test import ( "bytes" "context" - "crypto/rand" "fmt" "io/ioutil" + "math/rand" "os" "runtime" "strings" @@ -31,6 +31,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" ) @@ -48,10 +49,82 @@ func BenchmarkRoundtripSuccess(b *testing.B) { tdm, err := newTempDirMaker(b) require.NoError(b, err) b.Run("test-20-10000", func(b *testing.B) { - subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm) + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + }) + b.Run("test-p2p-stress-10-128MB", func(b *testing.B) { + p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm) }) } +func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + mn := mocknet.New(ctx) + mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000}) + net := tn.StreamNet(ctx, mn) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + instances, err := ig.Instances(1 + b.N) + require.NoError(b, err) + var allCids []cid.Cid + for i := 0; i < numfiles; i++ { + thisCids := df(ctx, b, instances[:1]) + allCids = append(allCids, thisCids...) + } + ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) + + allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + + runtime.GC() + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + fetcher := instances[i+1] + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + require.NoError(b, err) + start := time.Now() + disconnectOn := rand.Intn(numfiles) + for j := 0; j < numfiles; j++ { + resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) + + wg.Add(1) + go func(j int) { + defer wg.Done() + results := 0 + for { + select { + case <-ctx.Done(): + return + case <-resultChan: + results++ + if results == 100 && j == disconnectOn { + mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer) + mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer) + time.Sleep(100 * time.Millisecond) + mn.LinkPeers(instances[0].Peer, instances[i+1].Peer) + } + case err, ok := <-errChan: + if !ok { + return + } + b.Fatalf("received error on request: %s", err.Error()) + } + } + }(j) + } + wg.Wait() + result := runStats{ + Time: time.Since(start), + Name: b.Name(), + } + benchmarkLog = append(benchmarkLog, result) + cancel() + fetcher.Close() + } + testinstance.Close(instances) + ig.Close() +} func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -116,10 +189,10 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid -const unixfsChunkSize uint64 = 1 << 10 -const unixfsLinksPerLevel = 1024 +const defaultUnixfsChunkSize uint64 = 1 << 10 +const defaultUnixfsLinksPerLevel = 1024 -func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid { +func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid { data := make([]byte, size) _, err := rand.Read(data) @@ -151,11 +224,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block return nd.Cid() } -func allFilesUniformSize(size uint64) distFunc { +func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc { return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid { cids := make([]cid.Cid, 0, len(provs)) for _, prov := range provs { - c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size) + c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel) cids = append(cids, c) } return cids diff --git a/benchmarks/testnet/peernet.go b/benchmarks/testnet/peernet.go new file mode 100644 index 00000000..a8bd6959 --- /dev/null +++ b/benchmarks/testnet/peernet.go @@ -0,0 +1,40 @@ +package testnet + +import ( + "context" + + gsnet "github.com/ipfs/go-graphsync/network" + + "github.com/libp2p/go-libp2p-core/peer" + tnet "github.com/libp2p/go-libp2p-testing/net" + mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" +) + +type peernet struct { + mockpeernet.Mocknet +} + +// StreamNet is a testnet that uses libp2p's MockNet +func StreamNet(ctx context.Context, net mockpeernet.Mocknet) Network { + return &peernet{net} +} + +func (pn *peernet) Adapter(p tnet.Identity) gsnet.GraphSyncNetwork { + client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address()) + if err != nil { + panic(err.Error()) + } + pn.Mocknet.LinkAll() + return gsnet.NewFromLibp2pHost(client) +} + +func (pn *peernet) HasPeer(p peer.ID) bool { + for _, member := range pn.Mocknet.Peers() { + if p == member { + return true + } + } + return false +} + +var _ Network = (*peernet)(nil)