From d414841e80229faebd9c6cefcad0a056d05be9f0 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 28 Dec 2023 18:45:36 +0100 Subject: [PATCH] bitswap/network,bitswap/client: move content routing responsabilities to an option of the client Given that the previous commit remove the content advertising from the server, it did not made sense to share these paths on the network. The code has been reworked: - addresses aren't magically added to the peerstore as a side-effect of calling `Network.FindProvidersAsync`. Instead they are passed as hints to ConnectTo which copies libp2p `host.ConnectTo` API. - the providerquerymanager is completely shutdown when not using `WithContentSearch` option, this helps usecase where `routinghelpers.Null` is used for content routing and the consumer exclusively rely on broadcast, like networks where most peoples have all the content (Filecoin, Celestia, ...). --- CHANGELOG.md | 5 + bitswap/benchmarks_test.go | 9 +- bitswap/bitswap_test.go | 30 ++--- bitswap/client/bitswap_with_sessions_test.go | 40 ++++-- bitswap/client/client.go | 120 ++++++++++-------- .../internal/messagequeue/messagequeue.go | 1 - .../messagequeue/messagequeue_test.go | 4 - .../providerquerymanager.go | 22 +++- .../providerquerymanager_test.go | 31 +++-- bitswap/client/internal/session/session.go | 7 +- bitswap/network/interface.go | 17 +-- bitswap/network/ipfs_impl.go | 41 +----- bitswap/network/ipfs_impl_test.go | 23 ++-- bitswap/options.go | 4 + bitswap/testinstance/testinstance.go | 22 +++- bitswap/testnet/network_test.go | 3 +- bitswap/testnet/peernet.go | 11 +- bitswap/testnet/virtual.go | 41 +----- blockservice/test/mock.go | 3 +- examples/go.mod | 2 +- examples/unixfs-file-cid/main.go | 5 +- fetcher/helpers/block_visitor_test.go | 5 +- fetcher/impl/blockservice/fetcher_test.go | 11 +- ipld/merkledag/merkledag_test.go | 5 +- 24 files changed, 222 insertions(+), 240 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83e05b574..71b82366a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,9 +20,14 @@ The following emojis are used to highlight certain changes: - `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously. - `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID. - `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do. +- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`. + It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient. ### Changed +- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed. + - `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values. + ### Removed - 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. diff --git a/bitswap/benchmarks_test.go b/bitswap/benchmarks_test.go index 80eb373ab..2a174d570 100644 --- a/bitswap/benchmarks_test.go +++ b/bitswap/benchmarks_test.go @@ -20,7 +20,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" - mockrouting "github.com/ipfs/boxo/routing/mock" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" ) @@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) { oldSeedCount := bch.oldSeedCount newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount) - net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay) + net := tn.VirtualNetwork(fixedDelay) // Simulate an older Bitswap node (old protocol ID) that doesn't // send DONT_HAVE responses @@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { numblks := 1000 for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) { func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.VirtualNetwork(mockrouting.NewServer(), d) + net := tn.VirtualNetwork(d) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) @@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) { for i := 0; i < b.N; i++ { - net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator) + net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 7d2b1f924..7ffe09335 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk const kNetworkDelay = 0 * time.Millisecond func TestClose(t *testing.T) { - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -66,7 +66,7 @@ func TestClose(t *testing.T) { func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() - net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this } func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -118,7 +118,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) @@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { // Tests that a received block is not stored in the blockstore if the block was // not requested by the client func TestUnwantedBlockNotAdded(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) bsMessage := bsmsg.New(true) bsMessage.AddBlock(block) @@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) { // (because the live request queue is full) func TestPendingBlockAdded(t *testing.T) { ctx := context.Background() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) bg := blocksutil.NewBlockGenerator() sessionBroadcastWantCapacity := 4 @@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { if testing.Short() { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{ bitswap.TaskWorkerCount(5), bitswap.EngineTaskWorkerCount(5), @@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) { t.SkipNow() } - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) { } func TestEmptyKey(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bs := ig.Instances(1)[0].Exchange @@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6 } func TestBasicBitswap(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) { } func TestDoubleGet(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) { } func TestWantlistCleanup(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt { } func TestBitswapLedgerOneWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) { } func TestBitswapLedgerTwoWay(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() bg := blocksutil.NewBlockGenerator() @@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() { // Tests start and stop of a custom decision logic func TestWithScoreLedger(t *testing.T) { tsl := newTestingScoreLedger() - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index a3174d0a4..e3bc2ec1e 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/boxo/bitswap" "github.com/ipfs/boxo/bitswap/client/internal/session" "github.com/ipfs/boxo/bitswap/client/traceability" + bsnet "github.com/ipfs/boxo/bitswap/network" testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" mockrouting "github.com/ipfs/boxo/routing/mock" @@ -18,13 +19,15 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" tu "github.com/libp2p/go-libp2p-testing/etc" + tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" ) func getVirtualNetwork() tn.Network { // FIXME: the tests are really sensitive to the network delay. fix them to work // well under varying conditions - return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + return tn.VirtualNetwork(delay.Fixed(0)) } func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) { @@ -37,10 +40,6 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } - err = inst.Adapter.Provide(ctx, blk.Cid()) - if err != nil { - t.Fatal(err) - } } func TestBasicSessions(t *testing.T) { @@ -114,7 +113,7 @@ func TestSessionBetweenPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond)) + vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() @@ -219,16 +218,23 @@ func TestFetchNotConnected(t *testing.T) { defer cancel() vnet := getVirtualNetwork() + rs := mockrouting.NewServer() ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)}) defer ig.Close() bgen := blocksutil.NewBlockGenerator() - other := ig.Next() + var otherClient mockrouting.Client + other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + otherClient = rs.Client(id) + return nil, nil // don't add content search, only the client needs it + }) // Provide 10 blocks on Peer A blks := bgen.Blocks(10) for _, block := range blks { addBlock(t, ctx, other, block) + err := otherClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } var cids []cid.Cid @@ -239,7 +245,9 @@ func TestFetchNotConnected(t *testing.T) { // Request blocks with Peer B // Note: Peer A and Peer B are not initially connected, so this tests // that Peer B will search for and find Peer A - thisNode := ig.Next() + thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))} + }) ses := thisNode.Exchange.NewSession(ctx).(*session.Session) ses.SetBaseTickDelay(time.Millisecond * 10) @@ -262,6 +270,7 @@ func TestFetchAfterDisconnect(t *testing.T) { defer cancel() vnet := getVirtualNetwork() + rs := mockrouting.NewServer() ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{ bitswap.ProviderSearchDelay(10 * time.Millisecond), bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)), @@ -269,9 +278,11 @@ func TestFetchAfterDisconnect(t *testing.T) { defer ig.Close() bgen := blocksutil.NewBlockGenerator() - inst := ig.Instances(2) - peerA := inst[0] - peerB := inst[1] + var aClient mockrouting.Client + peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + aClient = rs.Client(id) + return nil, nil // don't add content search, only the client needs it + }) // Provide 5 blocks on Peer A blks := bgen.Blocks(10) @@ -283,9 +294,14 @@ func TestFetchAfterDisconnect(t *testing.T) { firstBlks := blks[:5] for _, block := range firstBlks { addBlock(t, ctx, peerA, block) + err := aClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } // Request all blocks with Peer B + peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) { + return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))} + }) ses := peerB.Exchange.NewSession(ctx).(*session.Session) ses.SetBaseTickDelay(time.Millisecond * 10) @@ -317,6 +333,8 @@ func TestFetchAfterDisconnect(t *testing.T) { lastBlks := blks[5:] for _, block := range lastBlks { addBlock(t, ctx, peerA, block) + err := aClient.Provide(ctx, block.Cid(), true) + require.NoError(t, err) } // Peer B should call FindProviders() and find Peer A diff --git a/bitswap/client/client.go b/bitswap/client/client.go index aa9ab78fa..f01851bee 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -99,6 +99,18 @@ func WithoutDuplicatedBlockStats() Option { } } +type ContentSearcher = bspqm.ContentRouter + +// WithContentSearch allows the client to search for providers when it is not +// able to find the content itself. +// Helps seeding sessions in networks where a significant amount of the peers +// connected do not have the content you want to download. +func WithContentSearch(router ContentSearcher) Option { + return func(bs *Client) { + bs.router = router + } +} + type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -121,56 +133,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore return nil }) - // onDontHaveTimeout is called when a want-block is sent to a peer that - // has an old version of Bitswap that doesn't support DONT_HAVE messages, - // or when no response is received within a timeout. - var sm *bssm.SessionManager - var bs *Client - onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { - // Simulate a message arriving with DONT_HAVEs - if bs.simulateDontHavesOnTimeout { - sm.ReceiveFrom(ctx, p, nil, nil, dontHaves) - } - } - peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network, onDontHaveTimeout) - } - - sim := bssim.New() - bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory, network.Self()) - pqm := bspqm.New(ctx, network) - - sessionFactory := func( - sessctx context.Context, - sessmgr bssession.SessionManager, - id uint64, - spm bssession.SessionPeerManager, - sim *bssim.SessionInterestManager, - pm bssession.PeerManager, - bpm *bsbpm.BlockPresenceManager, - notif notifications.PubSub, - provSearchDelay time.Duration, - rebroadcastDelay delay.D, - self peer.ID, - ) bssm.Session { - return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) - } - sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { - return bsspm.New(id, network.ConnectionManager()) - } - notif := notifications.New() - sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self()) - - bs = &Client{ + bs := &Client{ + ctx: ctx, blockstore: bstore, network: network, process: px, - pm: pm, - pqm: pqm, - sm: sm, - sim: sim, - notif: notif, + sim: bssim.New(), + notif: notifications.New(), counters: new(counters), dupMetric: bmetrics.DupHist(ctx), allMetric: bmetrics.AllHist(ctx), @@ -178,21 +147,26 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), simulateDontHavesOnTimeout: true, } + bs.pm = bspm.New(ctx, bs.peerQueueFactory, network.Self()) + bs.sm = bssm.New(ctx, bs.sessionFactory, bs.sim, bs.sessionPeerManagerFactory, bsbpm.New(), bs.pm, bs.notif, network.Self()) // apply functional options before starting and running bitswap for _, option := range options { option(bs) } - bs.pqm.Startup() + if bs.router != nil { + bs.pqm = bspqm.New(ctx, network, bs.router) + bs.pqm.Startup() + } // bind the context and process. // do it over here to avoid closing before all setup is done. go func() { <-px.Closing() // process closes first - sm.Shutdown() + bs.sm.Shutdown() cancelFunc() - notif.Shutdown() + bs.notif.Shutdown() }() procctx.CloseAfterContext(px, ctx) // parent cancelled first @@ -201,9 +175,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore // Client instances implement the bitswap protocol. type Client struct { + ctx context.Context + pm *bspm.PeerManager // the provider query manager manages requests to find providers + // is nil if content routing is disabled pqm *bspqm.ProviderQueryManager // network delivers messages on behalf of the session @@ -244,6 +221,9 @@ type Client struct { blockReceivedNotifier BlockReceivedNotifier + // optional content router + router ContentSearcher + // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool @@ -251,6 +231,46 @@ type Client struct { skipDuplicatedBlocksStats bool } +func (bs *Client) sessionFactory( + sessctx context.Context, + sessmgr bssession.SessionManager, + id uint64, + spm bssession.SessionPeerManager, + sim *bssim.SessionInterestManager, + pm bssession.PeerManager, + bpm *bsbpm.BlockPresenceManager, + notif notifications.PubSub, + provSearchDelay time.Duration, + rebroadcastDelay delay.D, + self peer.ID, +) bssm.Session { + // avoid typed nils + var pqm bssession.ProviderFinder + if bs.pqm != nil { + pqm = bs.pqm + } + + return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) +} + +// onDontHaveTimeout is called when a want-block is sent to a peer that +// has an old version of Bitswap that doesn't support DONT_HAVE messages, +// or when no response is received within a timeout. +func (bs *Client) onDontHaveTimeout(p peer.ID, dontHaves []cid.Cid) { + // Simulate a message arriving with DONT_HAVEs + if bs.simulateDontHavesOnTimeout { + bs.sm.ReceiveFrom(bs.ctx, p, nil, nil, dontHaves) + } +} + +func (bs *Client) peerQueueFactory(ctx context.Context, p peer.ID) bspm.PeerQueue { + return bsmq.New(ctx, p, bs.network, bs.onDontHaveTimeout) +} + +func (bs *Client) sessionPeerManagerFactory(ctx context.Context, id uint64) bssession.SessionPeerManager { + return bsspm.New(id, bs.network.ConnectionManager()) +} + type counters struct { blocksRecvd uint64 dupBlocksRecvd uint64 diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 4f90f239b..97b4f40d2 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -51,7 +51,6 @@ const ( // MessageNetwork is any network that can connect peers and generate a message // sender. type MessageNetwork interface { - ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) Latency(peer.ID) time.Duration Ping(context.Context, peer.ID) ping.Result diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 4d361c5d5..31d66a04a 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -26,10 +26,6 @@ type fakeMessageNetwork struct { messageSender bsnet.MessageSender } -func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { - return fmn.connectError -} - func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) { if fmn.messageSenderError == nil { return fmn.messageSender, nil diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index f918c409a..7c9b6bb22 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -34,8 +34,12 @@ type findProviderRequest struct { // ProviderQueryNetwork is an interface for finding providers and connecting to // peers. type ProviderQueryNetwork interface { - ConnectTo(context.Context, peer.ID) error - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID + Self() peer.ID + ConnectTo(context.Context, peer.AddrInfo) error +} + +type ContentRouter interface { + FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo } type providerQueryMessage interface { @@ -75,6 +79,7 @@ type cancelRequestMessage struct { type ProviderQueryManager struct { ctx context.Context network ProviderQueryNetwork + router ContentRouter providerQueryMessages chan providerQueryMessage providerRequestsProcessing chan *findProviderRequest incomingFindProviderRequests chan *findProviderRequest @@ -88,10 +93,11 @@ type ProviderQueryManager struct { // New initializes a new ProviderQueryManager for a given context and a given // network provider. -func New(ctx context.Context, network ProviderQueryNetwork) *ProviderQueryManager { +func New(ctx context.Context, network ProviderQueryNetwork, router ContentRouter) *ProviderQueryManager { return &ProviderQueryManager{ ctx: ctx, network: network, + router: router, providerQueryMessages: make(chan providerQueryMessage, 16), providerRequestsProcessing: make(chan *findProviderRequest), incomingFindProviderRequests: make(chan *findProviderRequest), @@ -235,11 +241,15 @@ func (pqm *ProviderQueryManager) findProviderWorker() { pqm.timeoutMutex.RLock() findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() - providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) + providers := pqm.router.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} for p := range providers { + if p.ID == pqm.network.Self() { + continue // ignore self as provider + } + wg.Add(1) - go func(p peer.ID) { + go func(p peer.AddrInfo) { defer wg.Done() err := pqm.network.ConnectTo(findProviderCtx, p) if err != nil { @@ -250,7 +260,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { case pqm.providerQueryMessages <- &receivedProviderMessage{ ctx: findProviderCtx, k: k, - p: p, + p: p.ID, }: case <-pqm.ctx.Done(): return diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go index 52447e2c1..518a904ed 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager_test.go @@ -14,6 +14,7 @@ import ( ) type fakeProviderNetwork struct { + self peer.ID peersFound []peer.ID connectError error delay time.Duration @@ -23,17 +24,21 @@ type fakeProviderNetwork struct { liveQueries int } -func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.ID) error { +func (fpn *fakeProviderNetwork) Self() peer.ID { + return fpn.self +} + +func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.AddrInfo) error { time.Sleep(fpn.connectDelay) return fpn.connectError } -func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { +func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo { fpn.queriesMadeMutex.Lock() fpn.queriesMade++ fpn.liveQueries++ fpn.queriesMadeMutex.Unlock() - incomingPeers := make(chan peer.ID) + incomingPeers := make(chan peer.AddrInfo) go func() { defer close(incomingPeers) for _, p := range fpn.peersFound { @@ -44,7 +49,7 @@ func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Ci default: } select { - case incomingPeers <- p: + case incomingPeers <- peer.AddrInfo{ID: p}: case <-ctx.Done(): return } @@ -64,7 +69,7 @@ func TestNormalSimultaneousFetch(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() keys := testutil.GenerateCids(2) @@ -101,7 +106,7 @@ func TestDedupingProviderRequests(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -141,7 +146,7 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -187,7 +192,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { ctx := context.Background() managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond) defer managerCancel() - providerQueryManager := New(managerCtx, fpn) + providerQueryManager := New(managerCtx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -221,7 +226,7 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() key := testutil.GenerateCids(1)[0] @@ -255,7 +260,7 @@ func TestRateLimitingRequests(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() keys := testutil.GenerateCids(maxInProcessRequests + 1) @@ -292,7 +297,7 @@ func TestFindProviderTimeout(t *testing.T) { delay: 10 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(2 * time.Millisecond) keys := testutil.GenerateCids(1) @@ -316,7 +321,7 @@ func TestFindProviderPreCanceled(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) keys := testutil.GenerateCids(1) @@ -341,7 +346,7 @@ func TestCancelFindProvidersAfterCompletion(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() - providerQueryManager := New(ctx, fpn) + providerQueryManager := New(ctx, fpn, fpn) providerQueryManager.Startup() providerQueryManager.SetFindProviderTimeout(100 * time.Millisecond) keys := testutil.GenerateCids(1) diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 39266a5e6..462803bfa 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -108,7 +108,7 @@ type Session struct { sm SessionManager pm PeerManager sprm SessionPeerManager - providerFinder ProviderFinder + providerFinder ProviderFinder // optional, nil when missing sim *bssim.SessionInterestManager sw sessionWants @@ -141,6 +141,7 @@ func New( sm SessionManager, id uint64, sprm SessionPeerManager, + // providerFinder might be nil providerFinder ProviderFinder, sim *bssim.SessionInterestManager, pm PeerManager, @@ -391,6 +392,10 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) { // findMorePeers attempts to find more peers for a session by searching for // providers for the given Cid func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) { + if s.providerFinder == nil { + // ¯\_(ツ)_/¯ + return + } go func(k cid.Cid) { for p := range s.providerFinder.FindProvidersAsync(ctx, k) { // When a provider indicates that it has a cid, it's equivalent to diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 962bc2588..dceb5f8c6 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -7,8 +7,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network/internal" - cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -40,7 +38,8 @@ type BitSwapNetwork interface { // Stop stops the network service. Stop() - ConnectTo(context.Context, peer.ID) error + // ConnectTo attempts to connect to the peer, using the passed addresses as a hint, they can be empty. + ConnectTo(context.Context, peer.AddrInfo) error DisconnectFrom(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) @@ -49,8 +48,6 @@ type BitSwapNetwork interface { Stats() Stats - Routing - Pinger } @@ -84,16 +81,6 @@ type Receiver interface { PeerDisconnected(peer.ID) } -// Routing is an interface to providing and finding providers on a bitswap -// network. -type Routing interface { - // FindProvidersAsync returns a channel of providers for the given key. - FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID - - // Provide provides the key to the network. - Provide(context.Context, cid.Cid) error -} - // Pinger is an interface to ping a peer and get the average latency of all pings type Pinger interface { // Ping a peer diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index a1446775c..4937f5ab8 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -11,15 +11,12 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network/internal" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - peerstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/protocol/ping" msgio "github.com/libp2p/go-msgio" ma "github.com/multiformats/go-multiaddr" @@ -38,12 +35,11 @@ var ( ) // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host. -func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork { +func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork { s := processSettings(opts...) bitswapNetwork := impl{ - host: host, - routing: r, + host: host, protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers, protocolBitswapOneZero: s.ProtocolPrefix + ProtocolBitswapOneZero, @@ -75,7 +71,6 @@ type impl struct { stats Stats host host.Host - routing routing.ContentRouting connectEvtMgr *connectEventManager protocolBitswapNoVers protocol.ID @@ -106,7 +101,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (network.Stream, erro tctx, cancel := context.WithTimeout(ctx, s.opts.SendTimeout) defer cancel() - if err := s.bsnet.ConnectTo(tctx, s.to); err != nil { + if err := s.bsnet.ConnectTo(tctx, peer.AddrInfo{ID: s.to}); err != nil { return nil, err } @@ -365,40 +360,14 @@ func (bsnet *impl) Stop() { bsnet.host.Network().StopNotify((*netNotifiee)(bsnet)) } -func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { - return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p}) +func (bsnet *impl) ConnectTo(ctx context.Context, p peer.AddrInfo) error { + return bsnet.host.Connect(ctx, p) } func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error { return bsnet.host.Network().ClosePeer(p) } -// FindProvidersAsync returns a channel of providers for the given key. -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - out := make(chan peer.ID, max) - go func() { - defer close(out) - providers := bsnet.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - if info.ID == bsnet.host.ID() { - continue // ignore self as provider - } - bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL) - select { - case <-ctx.Done(): - return - case out <- info.ID: - } - } - }() - return out -} - -// Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error { - return bsnet.routing.Provide(ctx, k, true) -} - // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s network.Stream) { defer s.Close() diff --git a/bitswap/network/ipfs_impl_test.go b/bitswap/network/ipfs_impl_test.go index af76e20d6..5529564f8 100644 --- a/bitswap/network/ipfs_impl_test.go +++ b/bitswap/network/ipfs_impl_test.go @@ -13,8 +13,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/network/internal" tn "github.com/ipfs/boxo/bitswap/testnet" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" blocksutil "github.com/ipfs/go-ipfs-blocksutil" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/host" @@ -170,8 +168,7 @@ func TestMessageSendAndReceive(t *testing.T) { defer cancel() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatal("Unable to setup network") } @@ -191,7 +188,7 @@ func TestMessageSendAndReceive(t *testing.T) { if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.ConnectTo(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -200,7 +197,7 @@ func TestMessageSendAndReceive(t *testing.T) { t.Fatal("did not connect peer") case <-r1.connectionEvent: } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.ConnectTo(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -274,7 +271,6 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec // create network mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() // Host 1 h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address()) @@ -282,8 +278,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh1 := &ErrHost{Host: h1} - routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore()) - bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1) + bsnet1 := bsnet.NewFromIpfsHost(eh1) bsnet1.Start(r1) t.Cleanup(bsnet1.Stop) if r1.listener != nil { @@ -296,8 +291,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal(err) } eh2 := &ErrHost{Host: h2} - routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore()) - bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2) + bsnet2 := bsnet.NewFromIpfsHost(eh2) bsnet2.Start(r2) t.Cleanup(bsnet2.Stop) if r2.listener != nil { @@ -309,7 +303,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec if err != nil { t.Fatal(err) } - err = bsnet1.ConnectTo(ctx, p2.ID()) + err = bsnet1.ConnectTo(ctx, peer.AddrInfo{ID: p2.ID()}) if err != nil { t.Fatal(err) } @@ -318,7 +312,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec t.Fatal("Expected connect event") } - err = bsnet2.ConnectTo(ctx, p1.ID()) + err = bsnet2.ConnectTo(ctx, peer.AddrInfo{ID: p1.ID()}) if err != nil { t.Fatal(err) } @@ -454,8 +448,7 @@ func TestSupportsHave(t *testing.T) { ctx := context.Background() mn := mocknet.New() defer mn.Close() - mr := mockrouting.NewServer() - streamNet, err := tn.StreamNet(ctx, mn, mr) + streamNet, err := tn.StreamNet(ctx, mn) if err != nil { t.Fatalf("Unable to setup network: %s", err) } diff --git a/bitswap/options.go b/bitswap/options.go index 9bea0b637..32ea3c767 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -75,6 +75,10 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { return Option{client.SetSimulateDontHavesOnTimeout(send)} } +func WithContentSearch(router client.ContentSearcher) Option { + return Option{client.WithContentSearch(router)} +} + func WithTracer(tap tracer.Tracer) Option { // Only trace the server, both receive the same messages anyway return Option{ diff --git a/bitswap/testinstance/testinstance.go b/bitswap/testinstance/testinstance.go index 5a052b831..037acbad4 100644 --- a/bitswap/testinstance/testinstance.go +++ b/bitswap/testinstance/testinstance.go @@ -49,12 +49,28 @@ func (g *InstanceGenerator) Close() error { // Next generates a new instance of bitswap + dependencies func (g *InstanceGenerator) Next() Instance { + return g.NextWithExtraOptions(nil) +} + +// NextWithExtraOptions is like [Next] but it will callback with a fake identity and append extra options. +// If extraOpts is nil, it will ignore it. +func (g *InstanceGenerator) NextWithExtraOptions(extraOpts func(p tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option)) Instance { g.seq++ p, err := p2ptestutil.RandTestBogusIdentity() if err != nil { - panic("FIXME") // TODO change signature + panic(err.Error()) // TODO change signature } - return NewInstance(g.ctx, g.net, p, g.netOptions, g.bsOptions) + + var extraNet []bsnet.NetOpt + var extraBitswap []bitswap.Option + if extraOpts != nil { + extraNet, extraBitswap = extraOpts(p) + } + + return NewInstance(g.ctx, g.net, p, + append(g.netOptions[:len(g.netOptions):len(g.netOptions)], extraNet...), + append(g.bsOptions[:len(g.bsOptions):len(g.bsOptions)], extraBitswap...), + ) } // Instances creates N test instances of bitswap + dependencies and connects @@ -74,7 +90,7 @@ func ConnectInstances(instances []Instance) { for i, inst := range instances { for j := i + 1; j < len(instances); j++ { oinst := instances[j] - err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer) + err := inst.Adapter.ConnectTo(context.Background(), peer.AddrInfo{ID: oinst.Peer}) if err != nil { panic(err.Error()) } diff --git a/bitswap/testnet/network_test.go b/bitswap/testnet/network_test.go index 0947eff3e..2d45e09b1 100644 --- a/bitswap/testnet/network_test.go +++ b/bitswap/testnet/network_test.go @@ -8,7 +8,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" @@ -17,7 +16,7 @@ import ( ) func TestSendMessageAsyncButWaitForResponse(t *testing.T) { - net := VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := VirtualNetwork(delay.Fixed(0)) responderPeer := tnet.RandIdentityOrFatal(t) waiter := net.Adapter(tnet.RandIdentityOrFatal(t)) responder := net.Adapter(responderPeer) diff --git a/bitswap/testnet/peernet.go b/bitswap/testnet/peernet.go index e4df19699..9abf189b7 100644 --- a/bitswap/testnet/peernet.go +++ b/bitswap/testnet/peernet.go @@ -5,9 +5,6 @@ import ( bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - ds "github.com/ipfs/go-datastore" - tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/peer" mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -15,12 +12,11 @@ import ( type peernet struct { mockpeernet.Mocknet - routingserver mockrouting.Server } // StreamNet is a testnet that uses libp2p's MockNet -func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) { - return &peernet{net, rs}, nil +func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) { + return &peernet{net}, nil } func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork { @@ -28,8 +24,7 @@ func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapN if err != nil { panic(err.Error()) } - routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore()) - return bsnet.NewFromIpfsHost(client, routing, opts...) + return bsnet.NewFromIpfsHost(client, opts...) } func (pn *peernet) HasPeer(p peer.ID) bool { diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 914044aed..252362655 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -11,27 +11,23 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" bsnet "github.com/ipfs/boxo/bitswap/network" - mockrouting "github.com/ipfs/boxo/routing/mock" - cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/core/routing" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) // VirtualNetwork generates a new testnet instance - a fake network that // is used to simulate sending messages. -func VirtualNetwork(rs mockrouting.Server, d delay.D) Network { +func VirtualNetwork(d delay.D) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: false, rateLimitGenerator: nil, conns: make(map[string]struct{}), @@ -45,13 +41,12 @@ type RateLimitGenerator interface { // RateLimitedVirtualNetwork generates a testnet instance where nodes are rate // limited in the upload/download speed. -func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network { +func RateLimitedVirtualNetwork(d delay.D, rateLimitGenerator RateLimitGenerator) Network { return &network{ latencies: make(map[peer.ID]map[peer.ID]time.Duration), rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter), clients: make(map[peer.ID]*receiverQueue), delay: d, - routingserver: rs, isRateLimited: true, rateLimitGenerator: rateLimitGenerator, conns: make(map[string]struct{}), @@ -63,7 +58,6 @@ type network struct { latencies map[peer.ID]map[peer.ID]time.Duration rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter clients map[peer.ID]*receiverQueue - routingserver mockrouting.Server delay delay.D isRateLimited bool rateLimitGenerator RateLimitGenerator @@ -105,7 +99,6 @@ func (n *network) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNe client := &networkClient{ local: p.ID(), network: n, - routing: n.routingserver.Client(p), supportedProtocols: s.SupportedProtocols, } n.clients[p.ID()] = &receiverQueue{receiver: client} @@ -192,7 +185,6 @@ type networkClient struct { local peer.ID receivers []bsnet.Receiver network *network - routing routing.Routing supportedProtocols []protocol.ID } @@ -253,27 +245,6 @@ func (nc *networkClient) Stats() bsnet.Stats { } } -// FindProvidersAsync returns a channel of providers for the given key. -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { - // NB: this function duplicates the AddrInfo -> ID transformation in the - // bitswap network adapter. Not to worry. This network client will be - // deprecated once the ipfsnet.Mock is added. The code below is only - // temporary. - - out := make(chan peer.ID) - go func() { - defer close(out) - providers := nc.routing.FindProvidersAsync(ctx, k, max) - for info := range providers { - select { - case <-ctx.Done(): - case out <- info.ID: - } - } - }() - return out -} - func (nc *networkClient) ConnectionManager() connmgr.ConnManager { return &connmgr.NullConnMgr{} } @@ -322,11 +293,6 @@ func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, opts * }, nil } -// Provide provides the key to the network. -func (nc *networkClient) Provide(ctx context.Context, k cid.Cid) error { - return nc.routing.Provide(ctx, k, true) -} - func (nc *networkClient) Start(r ...bsnet.Receiver) { nc.receivers = r } @@ -334,7 +300,8 @@ func (nc *networkClient) Start(r ...bsnet.Receiver) { func (nc *networkClient) Stop() { } -func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { +func (nc *networkClient) ConnectTo(_ context.Context, info peer.AddrInfo) error { + p := info.ID nc.network.mu.Lock() otherClient, ok := nc.network.clients[p] if !ok { diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e33551f4c..6cd7ed6d2 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -4,13 +4,12 @@ import ( testinstance "github.com/ipfs/boxo/bitswap/testinstance" tn "github.com/ipfs/boxo/bitswap/testnet" "github.com/ipfs/boxo/blockservice" - mockrouting "github.com/ipfs/boxo/routing/mock" delay "github.com/ipfs/go-ipfs-delay" ) // Mocks returns |n| connected mock Blockservices func Mocks(n int, opts ...blockservice.Option) []*blockservice.BlockService { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := tn.VirtualNetwork(delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(n) diff --git a/examples/go.mod b/examples/go.mod index 7b91ade2a..209668ac7 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -10,7 +10,6 @@ require ( github.com/ipld/go-car/v2 v2.13.1 github.com/ipld/go-ipld-prime v0.21.0 github.com/libp2p/go-libp2p v0.32.2 - github.com/libp2p/go-libp2p-routing-helpers v0.7.3 github.com/multiformats/go-multiaddr v0.12.1 github.com/multiformats/go-multicodec v0.9.0 github.com/prometheus/client_golang v1.18.0 @@ -88,6 +87,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.25.2 // indirect github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect github.com/libp2p/go-libp2p-record v0.2.0 // indirect + github.com/libp2p/go-libp2p-routing-helpers v0.7.3 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect diff --git a/examples/unixfs-file-cid/main.go b/examples/unixfs-file-cid/main.go index e1adad350..e20b64ec1 100644 --- a/examples/unixfs-file-cid/main.go +++ b/examples/unixfs-file-cid/main.go @@ -32,7 +32,6 @@ import ( unixfile "github.com/ipfs/boxo/ipld/unixfs/file" "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" bsclient "github.com/ipfs/boxo/bitswap/client" bsnet "github.com/ipfs/boxo/bitswap/network" @@ -178,14 +177,14 @@ func startDataServer(ctx context.Context, h host.Host) (cid.Cid, *bsserver.Serve // Start listening on the Bitswap protocol // For this example we're not leveraging any content routing (DHT, IPNI, delegated routing requests, etc.) as we know the peer we are fetching from - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsserver.New(ctx, n, bs) n.Start(bswap) return nd.Cid(), bswap, nil } func runClient(ctx context.Context, h host.Host, c cid.Cid, targetPeer string) ([]byte, error) { - n := bsnet.NewFromIpfsHost(h, routinghelpers.Null{}) + n := bsnet.NewFromIpfsHost(h) bswap := bsclient.New(ctx, n, blockstore.NewBlockstore(datastore.NewNullDatastore())) n.Start(bswap) defer bswap.Close() diff --git a/fetcher/helpers/block_visitor_test.go b/fetcher/helpers/block_visitor_test.go index 57d3e11ad..6276af1f6 100644 --- a/fetcher/helpers/block_visitor_test.go +++ b/fetcher/helpers/block_visitor_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/boxo/fetcher/helpers" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/fetcher/testutil" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -44,7 +43,7 @@ func TestFetchGraphToBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -94,7 +93,7 @@ func TestFetchGraphToUniqueBlocks(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/fetcher/impl/blockservice/fetcher_test.go b/fetcher/impl/blockservice/fetcher_test.go index 5a0b071f4..ddbd0863c 100644 --- a/fetcher/impl/blockservice/fetcher_test.go +++ b/fetcher/impl/blockservice/fetcher_test.go @@ -16,7 +16,6 @@ import ( "github.com/ipfs/boxo/fetcher/helpers" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/fetcher/testutil" - mockrouting "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -38,7 +37,7 @@ func TestFetchIPLDPrimeNode(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -87,7 +86,7 @@ func TestFetchIPLDGraph(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -143,7 +142,7 @@ func TestFetchIPLDPath(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -207,7 +206,7 @@ func TestHelpers(t *testing.T) { }) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() @@ -321,7 +320,7 @@ func TestNodeReification(t *testing.T) { na.AssembleEntry("link4").AssignLink(link4) })) - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0*time.Millisecond)) + net := tn.VirtualNetwork(delay.Fixed(0 * time.Millisecond)) ig := testinstance.NewTestInstanceGenerator(net, nil, nil) defer ig.Close() diff --git a/ipld/merkledag/merkledag_test.go b/ipld/merkledag/merkledag_test.go index 4dcc0b9c5..3ce5838b9 100644 --- a/ipld/merkledag/merkledag_test.go +++ b/ipld/merkledag/merkledag_test.go @@ -21,7 +21,6 @@ import ( . "github.com/ipfs/boxo/ipld/merkledag" mdpb "github.com/ipfs/boxo/ipld/merkledag/pb" dstest "github.com/ipfs/boxo/ipld/merkledag/test" - mockrouting "github.com/ipfs/boxo/routing/mock" delay "github.com/ipfs/go-ipfs-delay" bserv "github.com/ipfs/boxo/blockservice" @@ -511,7 +510,7 @@ func TestCantGet(t *testing.T) { } func TestFetchGraph(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := tn.VirtualNetwork(delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(2) dservs := [2]ipld.DAGService{ @@ -553,7 +552,7 @@ func TestFetchGraphWithDepthLimit(t *testing.T) { } testF := func(t *testing.T, tc testcase) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) + net := tn.VirtualNetwork(delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(2) dservs := [2]ipld.DAGService{