-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathindexerrouting.go
96 lines (86 loc) · 3.72 KB
/
indexerrouting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package bitswaphelpers
import (
"context"
"sync"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)
var _ routing.Routing = (*IndexerRouting)(nil)
// IndexerRouting provides an interface that satisfies routing.Routing but only returns
// provider records based on a preset set of providers read from the context key.
// Bitswap will potentially make multiple FindProvidersAsync requests, and the cid passed will not always be the root
// As a result, we have to rely on the retrieval id within a context key
// Also while there is a delegated routing client that talks to the indexer, we use this cause we run it on
// top of the processing we're doing at a higher level with multiprotocol filtering
type IndexerRouting struct {
routinghelpers.Null
providerSets map[types.RetrievalID][]types.RetrievalCandidate
providerSetsLk sync.Mutex
toRetrievalIDs func(cid.Cid) []types.RetrievalID
}
// NewIndexerRouting makes a new indexer routing instance
func NewIndexerRouting(toRetrievalID func(cid.Cid) []types.RetrievalID) *IndexerRouting {
return &IndexerRouting{
providerSets: make(map[types.RetrievalID][]types.RetrievalCandidate),
toRetrievalIDs: toRetrievalID,
}
}
// RemoveProviders removes all provider records for a given retrieval id
func (ir *IndexerRouting) RemoveProviders(retrievalID types.RetrievalID) {
ir.providerSetsLk.Lock()
defer ir.providerSetsLk.Unlock()
delete(ir.providerSets, retrievalID)
}
// AddProviders adds provider records to the total list for a given retrieval id
func (ir *IndexerRouting) AddProviders(retrievalID types.RetrievalID, providers []types.RetrievalCandidate) {
// dedup results to provide better answers
uniqueProvidersSet := make(map[string]struct{}, len(providers))
uniqueProviders := make([]types.RetrievalCandidate, 0, len(providers))
for _, p := range providers {
if _, ok := uniqueProvidersSet[p.MinerPeer.String()]; !ok {
uniqueProvidersSet[p.MinerPeer.String()] = struct{}{}
uniqueProviders = append(uniqueProviders, p)
}
}
ir.providerSetsLk.Lock()
defer ir.providerSetsLk.Unlock()
ir.providerSets[retrievalID] = append(ir.providerSets[retrievalID], uniqueProviders...)
}
// FindProvidersAsync returns providers based on the retrieval id in a context key
// It returns a channel with up to `max` providers, keeping the others around for a future call
// TODO: there is a slight risk that go-bitswap, which dedups requests by CID across multiple sessions,
// could accidentally read the wrong retrieval id if two retrievals were running at the same time. Not sure how much
// of a risk this really is, cause when requests are deduped, both calls still receive the results. See go-bitswap
// ProviderQueryManager for more specifics
func (ir *IndexerRouting) FindProvidersAsync(ctx context.Context, c cid.Cid, max int) <-chan peer.AddrInfo {
resultsChan := make(chan peer.AddrInfo)
go func() {
defer close(resultsChan)
retrievalIDs := ir.toRetrievalIDs(c)
ir.providerSetsLk.Lock()
var providers []types.RetrievalCandidate
for _, retrievalID := range retrievalIDs {
providers = append(providers, ir.providerSets[retrievalID]...)
if len(providers) > max {
providers, ir.providerSets[retrievalID] = providers[:max], providers[max:]
break
}
if len(ir.providerSets) == 0 {
delete(ir.providerSets, retrievalID)
}
}
ir.providerSetsLk.Unlock()
logger.Debugw("provider records requested from bitswap, sending back indexer results", "providerCount", len(providers))
for _, p := range providers {
select {
case <-ctx.Done():
return
case resultsChan <- p.MinerPeer:
}
}
}()
return resultsChan
}